diff --git a/pecan/core.py b/pecan/core.py index 55ce727..716df44 100644 --- a/pecan/core.py +++ b/pecan/core.py @@ -87,15 +87,10 @@ class Pecan(object): node, remainder = lookup_controller(node, path) return node, remainder - def handle_security(self, controller): - if controller.pecan.get('secured', False): - if not controller.pecan['check_permissions'](): - raise exc.HTTPUnauthorized - def determine_hooks(self, controller=None): controller_hooks = [] if controller: - controller_hooks = controller.pecan.get('hooks', []) + controller_hooks = controller._pecan.get('hooks', []) return list( sorted( chain(controller_hooks, self.hooks), @@ -175,14 +170,14 @@ class Pecan(object): state.content_type = self.get_content_type(format) controller, remainder = self.route(self.root, path) - if controller.pecan.get('generic_handler'): + if controller._pecan.get('generic_handler'): raise exc.HTTPNotFound # handle generic controllers im_self = None - if controller.pecan.get('generic'): + if controller._pecan.get('generic'): im_self = controller.im_self - handlers = controller.pecan['generic_handlers'] + handlers = controller._pecan['generic_handlers'] controller = handlers.get(request.method, handlers['DEFAULT']) # add the controller to the state so that hooks can use it @@ -190,37 +185,34 @@ class Pecan(object): # if unsure ask the controller for the default content type if state.content_type is None: - state.content_type = controller.pecan.get('content_type', 'text/html') + state.content_type = controller._pecan.get('content_type', 'text/html') # get a sorted list of hooks, by priority state.hooks = self.determine_hooks(controller) # handle "before" hooks self.handle_hooks('before', state) - # handle security - self.handle_security(controller) - # fetch and validate any parameters params = dict(state.request.str_params) - if 'schema' in controller.pecan: + if 'schema' in controller._pecan: request.validation_error = None try: params = self.validate( - controller.pecan['schema'], - json = controller.pecan['validate_json'], + controller._pecan['schema'], + json = controller._pecan['validate_json'], params = params ) except Invalid, e: request.validation_error = e - if controller.pecan['error_handler'] is not None: - redirect(controller.pecan['error_handler'], internal=True) - if controller.pecan['validate_json']: params = dict(data=params) + if controller._pecan['error_handler'] is not None: + redirect(controller._pecan['error_handler'], internal=True) + if controller._pecan['validate_json']: params = dict(data=params) # fetch the arguments for the controller args, kwargs = self.get_args( params, remainder, - controller.pecan['argspec'], + controller._pecan['argspec'], im_self ) @@ -235,7 +227,7 @@ class Pecan(object): raw_namespace = result # pull the template out based upon content type and handle overrides - template = controller.pecan.get('content_types', {}).get(state.content_type) + template = controller._pecan.get('content_types', {}).get(state.content_type) template = getattr(request, 'override_template', template) # if there is a template, render it diff --git a/pecan/decorators.py b/pecan/decorators.py index 18c4e31..3a9164a 100644 --- a/pecan/decorators.py +++ b/pecan/decorators.py @@ -1,8 +1,8 @@ from inspect import getargspec def _cfg(f): - if not hasattr(f, 'pecan'): f.pecan = {} - return f.pecan + if not hasattr(f, '_pecan'): f._pecan = {} + return f._pecan def when_for(controller): @@ -10,7 +10,7 @@ def when_for(controller): def decorate(f): expose(**kw)(f) _cfg(f)['generic_handler'] = True - controller.pecan['generic_handlers'][method.upper()] = f + controller._pecan['generic_handlers'][method.upper()] = f return f return decorate return when @@ -61,4 +61,4 @@ def transactional(ignore_redirects=True): wrap.__transactional__ = True wrap.__transactional_ignore_redirects__ = ignore_redirects return wrap - return deco \ No newline at end of file + return deco diff --git a/pecan/hooks.py b/pecan/hooks.py index aa061ef..9d64c39 100644 --- a/pecan/hooks.py +++ b/pecan/hooks.py @@ -17,7 +17,7 @@ def walk_controller(root_class, controller, hooks): if iscontroller(value): for hook in hooks: - value.pecan.setdefault('hooks', []).append(hook) + value._pecan.setdefault('hooks', []).append(hook) elif hasattr(value, '__class__'): if name.startswith('__') and name.endswith('__'): continue walk_controller(root_class, value, hooks) diff --git a/pecan/routing.py b/pecan/routing.py index e9ca5f0..ab388a7 100644 --- a/pecan/routing.py +++ b/pecan/routing.py @@ -1,12 +1,37 @@ from webob import exc +from inspect import ismethod, isfunction +STOP_NOW = False + +__all__ = ['lookup_controller', 'find_object'] + +def handle_security(controller): + if controller._pecan.get('secured', False): + if not controller._pecan['check_permissions'](): + raise exc.HTTPUnauthorized + +def cross_boundary(prev_obj, obj): + """ + check the security as we move across a boundary + """ + if prev_obj is None: + return + + meta = getattr(prev_obj, '_pecan', {}) + + if meta.get('secured', False): + if obj not in meta.get('unlocked', []): + if not meta['check_permissions'](): + raise exc.HTTPUnauthorized def lookup_controller(obj, url_path): remainder = url_path notfound_handlers = [] + while True: try: obj, remainder = find_object(obj, remainder, notfound_handlers) + handle_security(obj) return obj, remainder except exc.HTTPNotFound: while notfound_handlers: @@ -21,7 +46,10 @@ def lookup_controller(obj, url_path): try: result = obj(*remainder) if result: + prev_obj = obj obj, remainder = result + # crossing controller boundary + cross_boundary(prev_obj, obj) break except TypeError, te: print 'Got exception calling lookup(): %s (%s)' % (te, te.args) @@ -30,9 +58,13 @@ def lookup_controller(obj, url_path): def find_object(obj, remainder, notfound_handlers): + prev_obj = None while True: if obj is None: raise exc.HTTPNotFound if iscontroller(obj): return obj, remainder + + # are we traversing to another controller + cross_boundary(prev_obj, obj) if remainder and remainder[0] == '': index = getattr(obj, 'index', None) @@ -57,8 +89,8 @@ def find_object(obj, remainder, notfound_handlers): if not remainder: raise exc.HTTPNotFound next, remainder = remainder[0], remainder[1:] + prev_obj = obj obj = getattr(obj, next, None) - def iscontroller(obj): - return getattr(obj, 'exposed', False) \ No newline at end of file + return getattr(obj, 'exposed', False) diff --git a/pecan/secure.py b/pecan/secure.py index 54aab4a..32d1f62 100644 --- a/pecan/secure.py +++ b/pecan/secure.py @@ -1,80 +1,73 @@ -from inspect import getmembers, ismethod +from inspect import getmembers, ismethod, isfunction +from decorators import _cfg from routing import iscontroller +__all__ = ['Any', 'Protected', 'unlock', 'secure'] -__all__ = ['secure', 'unlocked', 'SecureController'] +class _Unlocked(object): + """ + A wrapper class to declare a class as unlocked inside of a SecureController + """ + def __init__(self, obj): + self.obj = obj -def unlocked(func): - if not hasattr(func, 'pecan'): func.pecan = {} - func.pecan['unlocked'] = True - return func +class _SecureState(object): + def __init__(self, desc, boolean_value): + self.description = desc + self.boolean_value = boolean_value + def __repr__(self): + return '' % self.description + def __nonzero__(self): + return self.boolean_value + +Any = _SecureState('Any', False) +Protected = _SecureState('Protected', True) + + +def unlocked(func_or_obj): + if ismethod(func_or_obj) or isfunction(func_or_obj): + _cfg(func_or_obj)['secured'] = Any + return func_or_obj + else: + return _Unlocked(func_or_obj) def secure(check_permissions): def wrap(func): - if not hasattr(func, 'pecan'): func.pecan = {} - func.pecan['secured'] = True - func.pecan['check_permissions'] = check_permissions + cfg = _cfg(func) + cfg['secured'] = Protected + cfg['check_permissions'] = check_permissions return func return wrap -def walk_controller(root_class, controller): - if hasattr(controller, '_lookup'): - # TODO: what about this? - controller._check_security = root_class._perform_validation - - if not isinstance(controller, (int, dict)): - for name, value in getmembers(controller): - if name == 'controller': continue - - if ismethod(value): - if iscontroller(value) and not value.pecan.get('unlocked', False): - value.pecan['secured'] = True - value.pecan['check_permissions'] = root_class.check_permissions - elif hasattr(value, '__class__'): - if name.startswith('__') and name.endswith('__'): continue - walk_controller(root_class, value) - - class SecureController(object): """ - Used to apply security to a controller and its children. + Used to apply security to a controller. Implementations of SecureController should extend the `check_permissions` method to return a True or False - value (depending on whether or not the user has access + value (depending on whether or not the user has permissions to the controller). """ class __metaclass__(type): def __init__(cls, name, bases, dict_): - walk_controller(cls, cls) - - @classmethod - def check_permissions(cls): - return True - - -class UnlockedControllerMeta(type): - """ - Can be used to force (override) a controller and all of its - subcontrollers to be unlocked/unsecured. - - This has the same effect as applying @pecan.secure.unlocked - to every method in the class and its subclasses. - """ - def __init__(cls, name, bases, ns): - cls.walk_and_apply_unlocked(cls, cls) - - def walk_and_apply_unlocked(cls, root_class, controller): - if not isinstance(controller, (int, dict)): - for name, value in getmembers(controller): - if name == 'controller': continue + cls._pecan = dict(secured=Protected, check_permissions=cls.check_permissions, unlocked=[]) + for name, value in getmembers(cls): if ismethod(value): - if iscontroller(value): - value = unlocked(value) + if iscontroller(value) and value._pecan.get('secured') is not Any: + value._pecan['secured'] = Protected + value._pecan['check_permissions'] = cls.check_permissions elif hasattr(value, '__class__'): if name.startswith('__') and name.endswith('__'): continue - cls.walk_and_apply_unlocked(root_class, value) \ No newline at end of file + if isinstance(value, _Unlocked): + # mark it as unlocked and remove wrapper + cls._pecan['unlocked'].append(value.obj) + setattr(cls, name, value.obj) + + @classmethod + def check_permissions(cls): + return False + diff --git a/tests/test_secure.py b/tests/test_secure.py index 533161f..a2571ba 100644 --- a/tests/test_secure.py +++ b/tests/test_secure.py @@ -1,9 +1,15 @@ +from unittest import TestCase + from pecan import expose, make_app -from pecan.secure import secure, unlocked, SecureController, UnlockedControllerMeta +from pecan.secure import secure, unlocked, SecureController, Protected from webtest import TestApp +try: + set() +except: + from sets import Set as set + class TestSecure(object): - def test_simple_secure(self): authorized = False @@ -58,12 +64,8 @@ class TestSecure(object): assert response.status_int == 200 assert response.body == 'Allowed!' - def test_unlocked_meta(self): - + def test_unlocked_attribute(self): class AuthorizedSubController(object): - - __metaclass__ = UnlockedControllerMeta - @expose() def index(self): return 'Index' @@ -82,11 +84,7 @@ class TestSecure(object): def allowed(self): return 'Allowed!' - @classmethod - def check_permissions(self): - return False - - authorized = AuthorizedSubController() + authorized = unlocked(AuthorizedSubController()) class RootController(object): @expose() @@ -131,4 +129,148 @@ class TestSecure(object): response = app.get('/secret/authorized/allowed') assert response.status_int == 200 - assert response.body == 'Allowed!' \ No newline at end of file + assert response.body == 'Allowed!' + + def test_state_attribute(self): + from pecan.secure import Any, Protected + assert repr(Any) == '' + assert bool(Any) is False + + assert repr(Protected) == '' + assert bool(Protected) is True + +class TestObjectPathSecurity(TestCase): + def setUp(self): + permissions_checked = getattr(self, 'permissions_checked', set()) + class DeepSecretController(SecureController): + authorized = False + @expose() + @unlocked + def _lookup(self, someID, *remainder): + if someID == 'notfound': + return None + return SubController(someID), remainder + + @expose() + def index(self): + return 'Deep Secret' + + @classmethod + def check_permissions(self): + permissions_checked.add('deepsecret') + return self.authorized + + deepsecret_instance = DeepSecretController() + + class SubController(object): + def __init__(self, myID): + self.myID = myID + + @expose() + def index(self): + return 'Index %s' % self.myID + + deepsecret = DeepSecretController() + + class SecretController(SecureController): + authorized = False + @expose() + def _lookup(self, someID, *remainder): + if someID == 'notfound': + return None + return SubController(someID), remainder + + @classmethod + def check_permissions(self): + permissions_checked.add('secretcontroller') + return self.authorized + + class NotSecretController(object): + @expose() + def _lookup(self, someID, *remainder): + if someID == 'notfound': + return None + return SubController(someID), remainder + + class RootController(object): + secret = SecretController() + notsecret = NotSecretController() + + self.deepsecret_cls = DeepSecretController + self.secret_cls = SecretController + + self.permissions_checked = permissions_checked + self.app = TestApp(make_app(RootController(), static_root='tests/static')) + + def tearDown(self): + self.permissions_checked.clear() + self.secret_cls.authorized = False + self.deepsecret_cls.authorized = False + + def test_sub_of_both_not_secret(self): + response = self.app.get('/notsecret/hi/') + assert response.status_int == 200 + assert response.body == 'Index hi' + + def test_protected_lookup(self): + response = self.app.get('/secret/hi/', expect_errors=True) + assert response.status_int == 401 + + self.secret_cls.authorized = True + response = self.app.get('/secret/hi/') + assert response.status_int == 200 + assert response.body == 'Index hi' + assert 'secretcontroller' in self.permissions_checked + + def test_secured_notfound_lookup(self): + response = self.app.get('/secret/notfound/', expect_errors=True) + assert response.status_int == 404 + + def test_secret_through_lookup(self): + response = self.app.get('/notsecret/hi/deepsecret/', expect_errors=True) + assert response.status_int == 401 + + def test_layered_protection(self): + response = self.app.get('/secret/hi/deepsecret/', expect_errors=True) + assert response.status_int == 401 + assert 'secretcontroller' in self.permissions_checked + + self.secret_cls.authorized = True + response = self.app.get('/secret/hi/deepsecret/', expect_errors=True) + assert response.status_int == 401 + assert 'secretcontroller' in self.permissions_checked + assert 'deepsecret' in self.permissions_checked + + self.deepsecret_cls.authorized = True + response = self.app.get('/secret/hi/deepsecret/') + assert response.status_int == 200 + assert response.body == 'Deep Secret' + assert 'secretcontroller' in self.permissions_checked + assert 'deepsecret' in self.permissions_checked + + def test_cyclical_protection(self): + self.secret_cls.authorized = True + self.deepsecret_cls.authorized = True + response = self.app.get('/secret/1/deepsecret/2/deepsecret/') + assert response.status_int == 200 + assert response.body == 'Deep Secret' + assert 'secretcontroller' in self.permissions_checked + assert 'deepsecret' in self.permissions_checked + + def test_unlocked_lookup(self): + response = self.app.get('/notsecret/1/deepsecret/2/') + assert response.status_int == 200 + assert response.body == 'Index 2' + assert 'deepsecret' not in self.permissions_checked + + response = self.app.get('/notsecret/1/deepsecret/notfound/', expect_errors=True) + assert response.status_int == 404 + assert 'deepsecret' not in self.permissions_checked + + def test_mixed_protection(self): + self.secret_cls.authorized = True + response = self.app.get('/secret/1/deepsecret/notfound/', expect_errors=True) + assert response.status_int == 404 + assert 'secretcontroller' in self.permissions_checked + assert 'deepsecret' not in self.permissions_checked +