Permissions are now checked as routing moves between controllers

.pecan metadata renamed to ._pecan
This commit is contained in:
Mark McClain
2011-01-14 21:40:23 -05:00
parent ed5c644779
commit 6c053856f3
6 changed files with 254 additions and 95 deletions

View File

@@ -87,15 +87,10 @@ class Pecan(object):
node, remainder = lookup_controller(node, path)
return node, remainder
def handle_security(self, controller):
if controller.pecan.get('secured', False):
if not controller.pecan['check_permissions']():
raise exc.HTTPUnauthorized
def determine_hooks(self, controller=None):
controller_hooks = []
if controller:
controller_hooks = controller.pecan.get('hooks', [])
controller_hooks = controller._pecan.get('hooks', [])
return list(
sorted(
chain(controller_hooks, self.hooks),
@@ -175,14 +170,14 @@ class Pecan(object):
state.content_type = self.get_content_type(format)
controller, remainder = self.route(self.root, path)
if controller.pecan.get('generic_handler'):
if controller._pecan.get('generic_handler'):
raise exc.HTTPNotFound
# handle generic controllers
im_self = None
if controller.pecan.get('generic'):
if controller._pecan.get('generic'):
im_self = controller.im_self
handlers = controller.pecan['generic_handlers']
handlers = controller._pecan['generic_handlers']
controller = handlers.get(request.method, handlers['DEFAULT'])
# add the controller to the state so that hooks can use it
@@ -190,37 +185,34 @@ class Pecan(object):
# if unsure ask the controller for the default content type
if state.content_type is None:
state.content_type = controller.pecan.get('content_type', 'text/html')
state.content_type = controller._pecan.get('content_type', 'text/html')
# get a sorted list of hooks, by priority
state.hooks = self.determine_hooks(controller)
# handle "before" hooks
self.handle_hooks('before', state)
# handle security
self.handle_security(controller)
# fetch and validate any parameters
params = dict(state.request.str_params)
if 'schema' in controller.pecan:
if 'schema' in controller._pecan:
request.validation_error = None
try:
params = self.validate(
controller.pecan['schema'],
json = controller.pecan['validate_json'],
controller._pecan['schema'],
json = controller._pecan['validate_json'],
params = params
)
except Invalid, e:
request.validation_error = e
if controller.pecan['error_handler'] is not None:
redirect(controller.pecan['error_handler'], internal=True)
if controller.pecan['validate_json']: params = dict(data=params)
if controller._pecan['error_handler'] is not None:
redirect(controller._pecan['error_handler'], internal=True)
if controller._pecan['validate_json']: params = dict(data=params)
# fetch the arguments for the controller
args, kwargs = self.get_args(
params,
remainder,
controller.pecan['argspec'],
controller._pecan['argspec'],
im_self
)
@@ -235,7 +227,7 @@ class Pecan(object):
raw_namespace = result
# pull the template out based upon content type and handle overrides
template = controller.pecan.get('content_types', {}).get(state.content_type)
template = controller._pecan.get('content_types', {}).get(state.content_type)
template = getattr(request, 'override_template', template)
# if there is a template, render it

View File

@@ -1,8 +1,8 @@
from inspect import getargspec
def _cfg(f):
if not hasattr(f, 'pecan'): f.pecan = {}
return f.pecan
if not hasattr(f, '_pecan'): f._pecan = {}
return f._pecan
def when_for(controller):
@@ -10,7 +10,7 @@ def when_for(controller):
def decorate(f):
expose(**kw)(f)
_cfg(f)['generic_handler'] = True
controller.pecan['generic_handlers'][method.upper()] = f
controller._pecan['generic_handlers'][method.upper()] = f
return f
return decorate
return when
@@ -61,4 +61,4 @@ def transactional(ignore_redirects=True):
wrap.__transactional__ = True
wrap.__transactional_ignore_redirects__ = ignore_redirects
return wrap
return deco
return deco

View File

@@ -17,7 +17,7 @@ def walk_controller(root_class, controller, hooks):
if iscontroller(value):
for hook in hooks:
value.pecan.setdefault('hooks', []).append(hook)
value._pecan.setdefault('hooks', []).append(hook)
elif hasattr(value, '__class__'):
if name.startswith('__') and name.endswith('__'): continue
walk_controller(root_class, value, hooks)

View File

@@ -1,12 +1,37 @@
from webob import exc
from inspect import ismethod, isfunction
STOP_NOW = False
__all__ = ['lookup_controller', 'find_object']
def handle_security(controller):
if controller._pecan.get('secured', False):
if not controller._pecan['check_permissions']():
raise exc.HTTPUnauthorized
def cross_boundary(prev_obj, obj):
"""
check the security as we move across a boundary
"""
if prev_obj is None:
return
meta = getattr(prev_obj, '_pecan', {})
if meta.get('secured', False):
if obj not in meta.get('unlocked', []):
if not meta['check_permissions']():
raise exc.HTTPUnauthorized
def lookup_controller(obj, url_path):
remainder = url_path
notfound_handlers = []
while True:
try:
obj, remainder = find_object(obj, remainder, notfound_handlers)
handle_security(obj)
return obj, remainder
except exc.HTTPNotFound:
while notfound_handlers:
@@ -21,7 +46,10 @@ def lookup_controller(obj, url_path):
try:
result = obj(*remainder)
if result:
prev_obj = obj
obj, remainder = result
# crossing controller boundary
cross_boundary(prev_obj, obj)
break
except TypeError, te:
print 'Got exception calling lookup(): %s (%s)' % (te, te.args)
@@ -30,9 +58,13 @@ def lookup_controller(obj, url_path):
def find_object(obj, remainder, notfound_handlers):
prev_obj = None
while True:
if obj is None: raise exc.HTTPNotFound
if iscontroller(obj): return obj, remainder
# are we traversing to another controller
cross_boundary(prev_obj, obj)
if remainder and remainder[0] == '':
index = getattr(obj, 'index', None)
@@ -57,8 +89,8 @@ def find_object(obj, remainder, notfound_handlers):
if not remainder: raise exc.HTTPNotFound
next, remainder = remainder[0], remainder[1:]
prev_obj = obj
obj = getattr(obj, next, None)
def iscontroller(obj):
return getattr(obj, 'exposed', False)
return getattr(obj, 'exposed', False)

View File

@@ -1,80 +1,73 @@
from inspect import getmembers, ismethod
from inspect import getmembers, ismethod, isfunction
from decorators import _cfg
from routing import iscontroller
__all__ = ['Any', 'Protected', 'unlock', 'secure']
__all__ = ['secure', 'unlocked', 'SecureController']
class _Unlocked(object):
"""
A wrapper class to declare a class as unlocked inside of a SecureController
"""
def __init__(self, obj):
self.obj = obj
def unlocked(func):
if not hasattr(func, 'pecan'): func.pecan = {}
func.pecan['unlocked'] = True
return func
class _SecureState(object):
def __init__(self, desc, boolean_value):
self.description = desc
self.boolean_value = boolean_value
def __repr__(self):
return '<SecureState %s>' % self.description
def __nonzero__(self):
return self.boolean_value
Any = _SecureState('Any', False)
Protected = _SecureState('Protected', True)
def unlocked(func_or_obj):
if ismethod(func_or_obj) or isfunction(func_or_obj):
_cfg(func_or_obj)['secured'] = Any
return func_or_obj
else:
return _Unlocked(func_or_obj)
def secure(check_permissions):
def wrap(func):
if not hasattr(func, 'pecan'): func.pecan = {}
func.pecan['secured'] = True
func.pecan['check_permissions'] = check_permissions
cfg = _cfg(func)
cfg['secured'] = Protected
cfg['check_permissions'] = check_permissions
return func
return wrap
def walk_controller(root_class, controller):
if hasattr(controller, '_lookup'):
# TODO: what about this?
controller._check_security = root_class._perform_validation
if not isinstance(controller, (int, dict)):
for name, value in getmembers(controller):
if name == 'controller': continue
if ismethod(value):
if iscontroller(value) and not value.pecan.get('unlocked', False):
value.pecan['secured'] = True
value.pecan['check_permissions'] = root_class.check_permissions
elif hasattr(value, '__class__'):
if name.startswith('__') and name.endswith('__'): continue
walk_controller(root_class, value)
class SecureController(object):
"""
Used to apply security to a controller and its children.
Used to apply security to a controller.
Implementations of SecureController should extend the
`check_permissions` method to return a True or False
value (depending on whether or not the user has access
value (depending on whether or not the user has permissions
to the controller).
"""
class __metaclass__(type):
def __init__(cls, name, bases, dict_):
walk_controller(cls, cls)
@classmethod
def check_permissions(cls):
return True
class UnlockedControllerMeta(type):
"""
Can be used to force (override) a controller and all of its
subcontrollers to be unlocked/unsecured.
This has the same effect as applying @pecan.secure.unlocked
to every method in the class and its subclasses.
"""
def __init__(cls, name, bases, ns):
cls.walk_and_apply_unlocked(cls, cls)
def walk_and_apply_unlocked(cls, root_class, controller):
if not isinstance(controller, (int, dict)):
for name, value in getmembers(controller):
if name == 'controller': continue
cls._pecan = dict(secured=Protected, check_permissions=cls.check_permissions, unlocked=[])
for name, value in getmembers(cls):
if ismethod(value):
if iscontroller(value):
value = unlocked(value)
if iscontroller(value) and value._pecan.get('secured') is not Any:
value._pecan['secured'] = Protected
value._pecan['check_permissions'] = cls.check_permissions
elif hasattr(value, '__class__'):
if name.startswith('__') and name.endswith('__'): continue
cls.walk_and_apply_unlocked(root_class, value)
if isinstance(value, _Unlocked):
# mark it as unlocked and remove wrapper
cls._pecan['unlocked'].append(value.obj)
setattr(cls, name, value.obj)
@classmethod
def check_permissions(cls):
return False

View File

@@ -1,9 +1,15 @@
from unittest import TestCase
from pecan import expose, make_app
from pecan.secure import secure, unlocked, SecureController, UnlockedControllerMeta
from pecan.secure import secure, unlocked, SecureController, Protected
from webtest import TestApp
try:
set()
except:
from sets import Set as set
class TestSecure(object):
def test_simple_secure(self):
authorized = False
@@ -58,12 +64,8 @@ class TestSecure(object):
assert response.status_int == 200
assert response.body == 'Allowed!'
def test_unlocked_meta(self):
def test_unlocked_attribute(self):
class AuthorizedSubController(object):
__metaclass__ = UnlockedControllerMeta
@expose()
def index(self):
return 'Index'
@@ -82,11 +84,7 @@ class TestSecure(object):
def allowed(self):
return 'Allowed!'
@classmethod
def check_permissions(self):
return False
authorized = AuthorizedSubController()
authorized = unlocked(AuthorizedSubController())
class RootController(object):
@expose()
@@ -131,4 +129,148 @@ class TestSecure(object):
response = app.get('/secret/authorized/allowed')
assert response.status_int == 200
assert response.body == 'Allowed!'
assert response.body == 'Allowed!'
def test_state_attribute(self):
from pecan.secure import Any, Protected
assert repr(Any) == '<SecureState Any>'
assert bool(Any) is False
assert repr(Protected) == '<SecureState Protected>'
assert bool(Protected) is True
class TestObjectPathSecurity(TestCase):
def setUp(self):
permissions_checked = getattr(self, 'permissions_checked', set())
class DeepSecretController(SecureController):
authorized = False
@expose()
@unlocked
def _lookup(self, someID, *remainder):
if someID == 'notfound':
return None
return SubController(someID), remainder
@expose()
def index(self):
return 'Deep Secret'
@classmethod
def check_permissions(self):
permissions_checked.add('deepsecret')
return self.authorized
deepsecret_instance = DeepSecretController()
class SubController(object):
def __init__(self, myID):
self.myID = myID
@expose()
def index(self):
return 'Index %s' % self.myID
deepsecret = DeepSecretController()
class SecretController(SecureController):
authorized = False
@expose()
def _lookup(self, someID, *remainder):
if someID == 'notfound':
return None
return SubController(someID), remainder
@classmethod
def check_permissions(self):
permissions_checked.add('secretcontroller')
return self.authorized
class NotSecretController(object):
@expose()
def _lookup(self, someID, *remainder):
if someID == 'notfound':
return None
return SubController(someID), remainder
class RootController(object):
secret = SecretController()
notsecret = NotSecretController()
self.deepsecret_cls = DeepSecretController
self.secret_cls = SecretController
self.permissions_checked = permissions_checked
self.app = TestApp(make_app(RootController(), static_root='tests/static'))
def tearDown(self):
self.permissions_checked.clear()
self.secret_cls.authorized = False
self.deepsecret_cls.authorized = False
def test_sub_of_both_not_secret(self):
response = self.app.get('/notsecret/hi/')
assert response.status_int == 200
assert response.body == 'Index hi'
def test_protected_lookup(self):
response = self.app.get('/secret/hi/', expect_errors=True)
assert response.status_int == 401
self.secret_cls.authorized = True
response = self.app.get('/secret/hi/')
assert response.status_int == 200
assert response.body == 'Index hi'
assert 'secretcontroller' in self.permissions_checked
def test_secured_notfound_lookup(self):
response = self.app.get('/secret/notfound/', expect_errors=True)
assert response.status_int == 404
def test_secret_through_lookup(self):
response = self.app.get('/notsecret/hi/deepsecret/', expect_errors=True)
assert response.status_int == 401
def test_layered_protection(self):
response = self.app.get('/secret/hi/deepsecret/', expect_errors=True)
assert response.status_int == 401
assert 'secretcontroller' in self.permissions_checked
self.secret_cls.authorized = True
response = self.app.get('/secret/hi/deepsecret/', expect_errors=True)
assert response.status_int == 401
assert 'secretcontroller' in self.permissions_checked
assert 'deepsecret' in self.permissions_checked
self.deepsecret_cls.authorized = True
response = self.app.get('/secret/hi/deepsecret/')
assert response.status_int == 200
assert response.body == 'Deep Secret'
assert 'secretcontroller' in self.permissions_checked
assert 'deepsecret' in self.permissions_checked
def test_cyclical_protection(self):
self.secret_cls.authorized = True
self.deepsecret_cls.authorized = True
response = self.app.get('/secret/1/deepsecret/2/deepsecret/')
assert response.status_int == 200
assert response.body == 'Deep Secret'
assert 'secretcontroller' in self.permissions_checked
assert 'deepsecret' in self.permissions_checked
def test_unlocked_lookup(self):
response = self.app.get('/notsecret/1/deepsecret/2/')
assert response.status_int == 200
assert response.body == 'Index 2'
assert 'deepsecret' not in self.permissions_checked
response = self.app.get('/notsecret/1/deepsecret/notfound/', expect_errors=True)
assert response.status_int == 404
assert 'deepsecret' not in self.permissions_checked
def test_mixed_protection(self):
self.secret_cls.authorized = True
response = self.app.get('/secret/1/deepsecret/notfound/', expect_errors=True)
assert response.status_int == 404
assert 'secretcontroller' in self.permissions_checked
assert 'deepsecret' not in self.permissions_checked