Moving policy engine implementation
This is the first step in moving the policy engine to django_openstack_auth. It makes the policy check method pluggable in openstack_dashboard as it is in horizon. To do this, the wrapper around the policy engine is moved to a new file isolated from the policy check method. A thin check method is left in policy.py to allow the code and policy mix-ins to behave as now. The existing policy test file is also moved for relocation. A new test file has been added to exercise the simpler check method. Once the actual engine is moved to django_openstack_auth, we'll be able to remove policy_backend.py and the test version of policy_backend.py as well. Partially Implemenents: blueprint move-policy-engine Change-Id: I591bcbe69bf255f1aa0346fdf869fac86d2e6d3d
This commit is contained in:
parent
04b692cc30
commit
39bd444a0c
|
@ -20,6 +20,7 @@ import os
|
||||||
import django
|
import django
|
||||||
from django.core.urlresolvers import reverse
|
from django.core.urlresolvers import reverse
|
||||||
from django import http
|
from django import http
|
||||||
|
from django.test.utils import override_settings
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from django.utils import unittest
|
from django.utils import unittest
|
||||||
|
|
||||||
|
@ -31,6 +32,7 @@ from horizon.workflows import views
|
||||||
|
|
||||||
from openstack_dashboard import api
|
from openstack_dashboard import api
|
||||||
from openstack_dashboard.dashboards.identity.projects import workflows
|
from openstack_dashboard.dashboards.identity.projects import workflows
|
||||||
|
from openstack_dashboard import policy_backend
|
||||||
from openstack_dashboard.test import helpers as test
|
from openstack_dashboard.test import helpers as test
|
||||||
from openstack_dashboard import usage
|
from openstack_dashboard import usage
|
||||||
from openstack_dashboard.usage import quotas
|
from openstack_dashboard.usage import quotas
|
||||||
|
@ -83,6 +85,7 @@ class TenantsViewTests(test.BaseAdminViewTests):
|
||||||
|
|
||||||
|
|
||||||
class ProjectsViewNonAdminTests(test.TestCase):
|
class ProjectsViewNonAdminTests(test.TestCase):
|
||||||
|
@override_settings(POLICY_CHECK_FUNCTION=policy_backend.check)
|
||||||
@test.create_stubs({api.keystone: ('tenant_list',)})
|
@test.create_stubs({api.keystone: ('tenant_list',)})
|
||||||
def test_index(self):
|
def test_index(self):
|
||||||
api.keystone.tenant_list(IsA(http.HttpRequest),
|
api.keystone.tenant_list(IsA(http.HttpRequest),
|
||||||
|
|
|
@ -1,5 +1,3 @@
|
||||||
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
@ -13,154 +11,21 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
"""Policy engine for Horizon"""
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import os.path
|
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from openstack_auth import utils as auth_utils
|
|
||||||
from oslo_config import cfg
|
|
||||||
|
|
||||||
from openstack_dashboard.openstack.common import policy
|
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
|
||||||
# Policy Enforcer has been updated to take in a policy directory
|
|
||||||
# as a config option. However, the default value in is set to
|
|
||||||
# ['policy.d'] which causes the code to break. Set the default
|
|
||||||
# value to empty list for now.
|
|
||||||
CONF.policy_dirs = []
|
|
||||||
|
|
||||||
_ENFORCER = None
|
|
||||||
_BASE_PATH = getattr(settings, 'POLICY_FILES_PATH', '')
|
|
||||||
|
|
||||||
|
|
||||||
def _get_enforcer():
|
|
||||||
global _ENFORCER
|
|
||||||
if not _ENFORCER:
|
|
||||||
_ENFORCER = {}
|
|
||||||
policy_files = getattr(settings, 'POLICY_FILES', {})
|
|
||||||
for service in policy_files.keys():
|
|
||||||
enforcer = policy.Enforcer()
|
|
||||||
enforcer.policy_path = os.path.join(_BASE_PATH,
|
|
||||||
policy_files[service])
|
|
||||||
if os.path.isfile(enforcer.policy_path):
|
|
||||||
LOG.debug("adding enforcer for service: %s" % service)
|
|
||||||
_ENFORCER[service] = enforcer
|
|
||||||
else:
|
|
||||||
LOG.warn("policy file for service: %s not found at %s" %
|
|
||||||
(service, enforcer.policy_path))
|
|
||||||
return _ENFORCER
|
|
||||||
|
|
||||||
|
|
||||||
def reset():
|
|
||||||
global _ENFORCER
|
|
||||||
_ENFORCER = None
|
|
||||||
|
|
||||||
|
|
||||||
def check(actions, request, target=None):
|
def check(actions, request, target=None):
|
||||||
"""Check user permission.
|
"""Wrapper of the configurable policy method."""
|
||||||
|
|
||||||
Check if the user has permission to the action according
|
policy_check = getattr(settings, "POLICY_CHECK_FUNCTION", None)
|
||||||
to policy setting.
|
|
||||||
|
|
||||||
:param actions: list of scope and action to do policy checks on,
|
if policy_check:
|
||||||
the composition of which is (scope, action)
|
return policy_check(actions, request, target)
|
||||||
|
|
||||||
* scope: service type managing the policy for action
|
|
||||||
|
|
||||||
* action: string representing the action to be checked
|
|
||||||
|
|
||||||
this should be colon separated for clarity.
|
|
||||||
i.e.
|
|
||||||
|
|
||||||
| compute:create_instance
|
|
||||||
| compute:attach_volume
|
|
||||||
| volume:attach_volume
|
|
||||||
|
|
||||||
for a policy action that requires a single action, actions
|
|
||||||
should look like
|
|
||||||
|
|
||||||
| "(("compute", "compute:create_instance"),)"
|
|
||||||
|
|
||||||
for a multiple action check, actions should look like
|
|
||||||
| "(("identity", "identity:list_users"),
|
|
||||||
| ("identity", "identity:list_roles"))"
|
|
||||||
|
|
||||||
:param request: django http request object. If not specified, credentials
|
|
||||||
must be passed.
|
|
||||||
:param target: dictionary representing the object of the action
|
|
||||||
for object creation this should be a dictionary
|
|
||||||
representing the location of the object e.g.
|
|
||||||
{'project_id': object.project_id}
|
|
||||||
:returns: boolean if the user has permission or not for the actions.
|
|
||||||
"""
|
|
||||||
if target is None:
|
|
||||||
target = {}
|
|
||||||
user = auth_utils.get_user(request)
|
|
||||||
|
|
||||||
# Several service policy engines default to a project id check for
|
|
||||||
# ownership. Since the user is already scoped to a project, if a
|
|
||||||
# different project id has not been specified use the currently scoped
|
|
||||||
# project's id.
|
|
||||||
#
|
|
||||||
# The reason is the operator can edit the local copies of the service
|
|
||||||
# policy file. If a rule is removed, then the default rule is used. We
|
|
||||||
# don't want to block all actions because the operator did not fully
|
|
||||||
# understand the implication of editing the policy file. Additionally,
|
|
||||||
# the service APIs will correct us if we are too permissive.
|
|
||||||
if target.get('project_id') is None:
|
|
||||||
target['project_id'] = user.project_id
|
|
||||||
# same for user_id
|
|
||||||
if target.get('user_id') is None:
|
|
||||||
target['user_id'] = user.id
|
|
||||||
# same for domain_id
|
|
||||||
if target.get('domain_id') is None:
|
|
||||||
target['domain_id'] = user.domain_id
|
|
||||||
|
|
||||||
credentials = _user_to_credentials(request, user)
|
|
||||||
|
|
||||||
enforcer = _get_enforcer()
|
|
||||||
|
|
||||||
for action in actions:
|
|
||||||
scope, action = action[0], action[1]
|
|
||||||
if scope in enforcer:
|
|
||||||
# if any check fails return failure
|
|
||||||
if not enforcer[scope].enforce(action, target, credentials):
|
|
||||||
# to match service implementations, if a rule is not found,
|
|
||||||
# use the default rule for that service policy
|
|
||||||
#
|
|
||||||
# waiting to make the check because the first call to
|
|
||||||
# enforce loads the rules
|
|
||||||
if action not in enforcer[scope].rules:
|
|
||||||
if not enforcer[scope].enforce('default',
|
|
||||||
target, credentials):
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
# if no policy for scope, allow action, underlying API will
|
|
||||||
# ultimately block the action if not permitted, treat as though
|
|
||||||
# allowed
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def _user_to_credentials(request, user):
|
|
||||||
if not hasattr(user, "_credentials"):
|
|
||||||
roles = [role['name'] for role in user.roles]
|
|
||||||
user._credentials = {'user_id': user.id,
|
|
||||||
'token': user.token,
|
|
||||||
'username': user.username,
|
|
||||||
'project_id': user.project_id,
|
|
||||||
'project_name': user.project_name,
|
|
||||||
'domain_id': user.user_domain_id,
|
|
||||||
'is_admin': user.is_superuser,
|
|
||||||
'roles': roles}
|
|
||||||
return user._credentials
|
|
||||||
|
|
||||||
|
|
||||||
class PolicyTargetMixin(object):
|
class PolicyTargetMixin(object):
|
||||||
"""Mixin that adds the get_policy_target function
|
"""Mixin that adds the get_policy_target function
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,160 @@
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""Policy engine for Horizon"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os.path
|
||||||
|
|
||||||
|
from django.conf import settings
|
||||||
|
from openstack_auth import utils as auth_utils
|
||||||
|
from oslo_config import cfg
|
||||||
|
|
||||||
|
from openstack_dashboard.openstack.common import policy
|
||||||
|
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
# Policy Enforcer has been updated to take in a policy directory
|
||||||
|
# as a config option. However, the default value in is set to
|
||||||
|
# ['policy.d'] which causes the code to break. Set the default
|
||||||
|
# value to empty list for now.
|
||||||
|
CONF.policy_dirs = []
|
||||||
|
|
||||||
|
_ENFORCER = None
|
||||||
|
_BASE_PATH = getattr(settings, 'POLICY_FILES_PATH', '')
|
||||||
|
|
||||||
|
|
||||||
|
def _get_enforcer():
|
||||||
|
global _ENFORCER
|
||||||
|
if not _ENFORCER:
|
||||||
|
_ENFORCER = {}
|
||||||
|
policy_files = getattr(settings, 'POLICY_FILES', {})
|
||||||
|
for service in policy_files.keys():
|
||||||
|
enforcer = policy.Enforcer()
|
||||||
|
enforcer.policy_path = os.path.join(_BASE_PATH,
|
||||||
|
policy_files[service])
|
||||||
|
if os.path.isfile(enforcer.policy_path):
|
||||||
|
LOG.debug("adding enforcer for service: %s" % service)
|
||||||
|
_ENFORCER[service] = enforcer
|
||||||
|
else:
|
||||||
|
LOG.warn("policy file for service: %s not found at %s" %
|
||||||
|
(service, enforcer.policy_path))
|
||||||
|
return _ENFORCER
|
||||||
|
|
||||||
|
|
||||||
|
def reset():
|
||||||
|
global _ENFORCER
|
||||||
|
_ENFORCER = None
|
||||||
|
|
||||||
|
|
||||||
|
def check(actions, request, target=None):
|
||||||
|
"""Check user permission.
|
||||||
|
|
||||||
|
Check if the user has permission to the action according
|
||||||
|
to policy setting.
|
||||||
|
|
||||||
|
:param actions: list of scope and action to do policy checks on,
|
||||||
|
the composition of which is (scope, action)
|
||||||
|
|
||||||
|
* scope: service type managing the policy for action
|
||||||
|
|
||||||
|
* action: string representing the action to be checked
|
||||||
|
|
||||||
|
this should be colon separated for clarity.
|
||||||
|
i.e.
|
||||||
|
|
||||||
|
| compute:create_instance
|
||||||
|
| compute:attach_volume
|
||||||
|
| volume:attach_volume
|
||||||
|
|
||||||
|
for a policy action that requires a single action, actions
|
||||||
|
should look like
|
||||||
|
|
||||||
|
| "(("compute", "compute:create_instance"),)"
|
||||||
|
|
||||||
|
for a multiple action check, actions should look like
|
||||||
|
| "(("identity", "identity:list_users"),
|
||||||
|
| ("identity", "identity:list_roles"))"
|
||||||
|
|
||||||
|
:param request: django http request object. If not specified, credentials
|
||||||
|
must be passed.
|
||||||
|
:param target: dictionary representing the object of the action
|
||||||
|
for object creation this should be a dictionary
|
||||||
|
representing the location of the object e.g.
|
||||||
|
{'project_id': object.project_id}
|
||||||
|
:returns: boolean if the user has permission or not for the actions.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if target is None:
|
||||||
|
target = {}
|
||||||
|
user = auth_utils.get_user(request)
|
||||||
|
|
||||||
|
# Several service policy engines default to a project id check for
|
||||||
|
# ownership. Since the user is already scoped to a project, if a
|
||||||
|
# different project id has not been specified use the currently scoped
|
||||||
|
# project's id.
|
||||||
|
#
|
||||||
|
# The reason is the operator can edit the local copies of the service
|
||||||
|
# policy file. If a rule is removed, then the default rule is used. We
|
||||||
|
# don't want to block all actions because the operator did not fully
|
||||||
|
# understand the implication of editing the policy file. Additionally,
|
||||||
|
# the service APIs will correct us if we are too permissive.
|
||||||
|
if target.get('project_id') is None:
|
||||||
|
target['project_id'] = user.project_id
|
||||||
|
# same for user_id
|
||||||
|
if target.get('user_id') is None:
|
||||||
|
target['user_id'] = user.id
|
||||||
|
# same for domain_id
|
||||||
|
if target.get('domain_id') is None:
|
||||||
|
target['domain_id'] = user.domain_id
|
||||||
|
|
||||||
|
credentials = _user_to_credentials(request, user)
|
||||||
|
|
||||||
|
enforcer = _get_enforcer()
|
||||||
|
|
||||||
|
for action in actions:
|
||||||
|
scope, action = action[0], action[1]
|
||||||
|
if scope in enforcer:
|
||||||
|
# if any check fails return failure
|
||||||
|
if not enforcer[scope].enforce(action, target, credentials):
|
||||||
|
# to match service implementations, if a rule is not found,
|
||||||
|
# use the default rule for that service policy
|
||||||
|
#
|
||||||
|
# waiting to make the check because the first call to
|
||||||
|
# enforce loads the rules
|
||||||
|
if action not in enforcer[scope].rules:
|
||||||
|
if not enforcer[scope].enforce('default',
|
||||||
|
target, credentials):
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
# if no policy for scope, allow action, underlying API will
|
||||||
|
# ultimately block the action if not permitted, treat as though
|
||||||
|
# allowed
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def _user_to_credentials(request, user):
|
||||||
|
if not hasattr(user, "_credentials"):
|
||||||
|
roles = [role['name'] for role in user.roles]
|
||||||
|
user._credentials = {'user_id': user.id,
|
||||||
|
'token': user.token,
|
||||||
|
'username': user.username,
|
||||||
|
'project_id': user.project_id,
|
||||||
|
'project_name': user.project_name,
|
||||||
|
'domain_id': user.user_domain_id,
|
||||||
|
'is_admin': user.is_superuser,
|
||||||
|
'roles': roles}
|
||||||
|
return user._credentials
|
|
@ -276,8 +276,8 @@ if not SECRET_KEY:
|
||||||
SECRET_KEY = secret_key.generate_or_read_from_file(os.path.join(LOCAL_PATH,
|
SECRET_KEY = secret_key.generate_or_read_from_file(os.path.join(LOCAL_PATH,
|
||||||
'.secret_key_store'))
|
'.secret_key_store'))
|
||||||
|
|
||||||
from openstack_dashboard import policy
|
from openstack_dashboard import policy_backend
|
||||||
POLICY_CHECK_FUNCTION = policy.check
|
POLICY_CHECK_FUNCTION = policy_backend.check
|
||||||
|
|
||||||
# Add HORIZON_CONFIG to the context information for offline compression
|
# Add HORIZON_CONFIG to the context information for offline compression
|
||||||
COMPRESS_OFFLINE_CONTEXT = {
|
COMPRESS_OFFLINE_CONTEXT = {
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
|
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
@ -12,82 +11,30 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
from django.test.utils import override_settings
|
||||||
|
|
||||||
from openstack_dashboard import policy
|
from openstack_dashboard import policy
|
||||||
|
from openstack_dashboard import policy_backend
|
||||||
from openstack_dashboard.test import helpers as test
|
from openstack_dashboard.test import helpers as test
|
||||||
|
|
||||||
|
|
||||||
class PolicyTestCase(test.TestCase):
|
class PolicyTestCase(test.TestCase):
|
||||||
def test_policy_file_load(self):
|
@override_settings(POLICY_CHECK_FUNCTION=policy_backend.check)
|
||||||
policy.reset()
|
def test_policy_check_set(self):
|
||||||
enforcer = policy._get_enforcer()
|
|
||||||
self.assertEqual(2, len(enforcer))
|
|
||||||
self.assertTrue('identity' in enforcer)
|
|
||||||
self.assertTrue('compute' in enforcer)
|
|
||||||
|
|
||||||
def test_policy_reset(self):
|
|
||||||
policy._get_enforcer()
|
|
||||||
self.assertEqual(2, len(policy._ENFORCER))
|
|
||||||
policy.reset()
|
|
||||||
self.assertIsNone(policy._ENFORCER)
|
|
||||||
|
|
||||||
def test_check_admin_required_false(self):
|
|
||||||
policy.reset()
|
|
||||||
value = policy.check((("identity", "admin_required"),),
|
value = policy.check((("identity", "admin_required"),),
|
||||||
request=self.request)
|
request=self.request)
|
||||||
self.assertFalse(value)
|
self.assertFalse(value)
|
||||||
|
|
||||||
def test_check_identity_rule_not_found_false(self):
|
@override_settings(POLICY_CHECK_FUNCTION=None)
|
||||||
policy.reset()
|
def test_policy_check_not_set(self):
|
||||||
value = policy.check((("identity", "i_dont_exist"),),
|
|
||||||
request=self.request)
|
|
||||||
# this should fail because the default check for
|
|
||||||
# identity is admin_required
|
|
||||||
self.assertFalse(value)
|
|
||||||
|
|
||||||
def test_check_nova_context_is_admin_false(self):
|
|
||||||
policy.reset()
|
|
||||||
value = policy.check((("compute", "context_is_admin"),),
|
|
||||||
request=self.request)
|
|
||||||
self.assertFalse(value)
|
|
||||||
|
|
||||||
def test_compound_check_false(self):
|
|
||||||
policy.reset()
|
|
||||||
value = policy.check((("identity", "admin_required"),
|
|
||||||
("identity", "identity:default"),),
|
|
||||||
request=self.request)
|
|
||||||
self.assertFalse(value)
|
|
||||||
|
|
||||||
def test_scope_not_found(self):
|
|
||||||
policy.reset()
|
|
||||||
value = policy.check((("dummy", "default"),),
|
|
||||||
request=self.request)
|
|
||||||
self.assertTrue(value)
|
|
||||||
|
|
||||||
|
|
||||||
class PolicyTestCaseAdmin(test.BaseAdminViewTests):
|
|
||||||
def test_check_admin_required_true(self):
|
|
||||||
policy.reset()
|
|
||||||
value = policy.check((("identity", "admin_required"),),
|
value = policy.check((("identity", "admin_required"),),
|
||||||
request=self.request)
|
request=self.request)
|
||||||
self.assertTrue(value)
|
self.assertTrue(value)
|
||||||
|
|
||||||
def test_check_identity_rule_not_found_true(self):
|
|
||||||
policy.reset()
|
|
||||||
value = policy.check((("identity", "i_dont_exist"),),
|
|
||||||
request=self.request)
|
|
||||||
# this should succeed because the default check for
|
|
||||||
# identity is admin_required
|
|
||||||
self.assertTrue(value)
|
|
||||||
|
|
||||||
def test_compound_check_true(self):
|
class PolicyBackendTestCaseAdmin(test.BaseAdminViewTests):
|
||||||
policy.reset()
|
@override_settings(POLICY_CHECK_FUNCTION=policy_backend.check)
|
||||||
value = policy.check((("identity", "admin_required"),
|
def test_policy_check_set_admin(self):
|
||||||
("identity", "identity:default"),),
|
value = policy.check((("identity", "admin_required"),),
|
||||||
request=self.request)
|
|
||||||
self.assertTrue(value)
|
|
||||||
|
|
||||||
def test_check_nova_context_is_admin_true(self):
|
|
||||||
policy.reset()
|
|
||||||
value = policy.check((("compute", "context_is_admin"),),
|
|
||||||
request=self.request)
|
request=self.request)
|
||||||
self.assertTrue(value)
|
self.assertTrue(value)
|
||||||
|
|
|
@ -0,0 +1,103 @@
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from django.test.utils import override_settings
|
||||||
|
|
||||||
|
from openstack_dashboard import policy
|
||||||
|
from openstack_dashboard import policy_backend
|
||||||
|
from openstack_dashboard.test import helpers as test
|
||||||
|
|
||||||
|
|
||||||
|
class PolicyBackendTestCase(test.TestCase):
|
||||||
|
def test_policy_file_load(self):
|
||||||
|
policy_backend.reset()
|
||||||
|
enforcer = policy_backend._get_enforcer()
|
||||||
|
self.assertEqual(2, len(enforcer))
|
||||||
|
self.assertTrue('identity' in enforcer)
|
||||||
|
self.assertTrue('compute' in enforcer)
|
||||||
|
|
||||||
|
def test_policy_reset(self):
|
||||||
|
policy_backend._get_enforcer()
|
||||||
|
self.assertEqual(2, len(policy_backend._ENFORCER))
|
||||||
|
policy_backend.reset()
|
||||||
|
self.assertIsNone(policy_backend._ENFORCER)
|
||||||
|
|
||||||
|
@override_settings(POLICY_CHECK_FUNCTION=policy_backend.check)
|
||||||
|
def test_check_admin_required_false(self):
|
||||||
|
policy_backend.reset()
|
||||||
|
value = policy.check((("identity", "admin_required"),),
|
||||||
|
request=self.request)
|
||||||
|
self.assertFalse(value)
|
||||||
|
|
||||||
|
@override_settings(POLICY_CHECK_FUNCTION=policy_backend.check)
|
||||||
|
def test_check_identity_rule_not_found_false(self):
|
||||||
|
policy_backend.reset()
|
||||||
|
value = policy.check((("identity", "i_dont_exist"),),
|
||||||
|
request=self.request)
|
||||||
|
# this should fail because the default check for
|
||||||
|
# identity is admin_required
|
||||||
|
self.assertFalse(value)
|
||||||
|
|
||||||
|
@override_settings(POLICY_CHECK_FUNCTION=policy_backend.check)
|
||||||
|
def test_check_nova_context_is_admin_false(self):
|
||||||
|
policy_backend.reset()
|
||||||
|
value = policy.check((("compute", "context_is_admin"),),
|
||||||
|
request=self.request)
|
||||||
|
self.assertFalse(value)
|
||||||
|
|
||||||
|
@override_settings(POLICY_CHECK_FUNCTION=policy_backend.check)
|
||||||
|
def test_compound_check_false(self):
|
||||||
|
policy_backend.reset()
|
||||||
|
value = policy.check((("identity", "admin_required"),
|
||||||
|
("identity", "identity:default"),),
|
||||||
|
request=self.request)
|
||||||
|
self.assertFalse(value)
|
||||||
|
|
||||||
|
@override_settings(POLICY_CHECK_FUNCTION=policy_backend.check)
|
||||||
|
def test_scope_not_found(self):
|
||||||
|
policy_backend.reset()
|
||||||
|
value = policy.check((("dummy", "default"),),
|
||||||
|
request=self.request)
|
||||||
|
self.assertTrue(value)
|
||||||
|
|
||||||
|
|
||||||
|
class PolicyBackendTestCaseAdmin(test.BaseAdminViewTests):
|
||||||
|
@override_settings(POLICY_CHECK_FUNCTION=policy_backend.check)
|
||||||
|
def test_check_admin_required_true(self):
|
||||||
|
policy_backend.reset()
|
||||||
|
value = policy.check((("identity", "admin_required"),),
|
||||||
|
request=self.request)
|
||||||
|
self.assertTrue(value)
|
||||||
|
|
||||||
|
@override_settings(POLICY_CHECK_FUNCTION=policy_backend.check)
|
||||||
|
def test_check_identity_rule_not_found_true(self):
|
||||||
|
policy_backend.reset()
|
||||||
|
value = policy.check((("identity", "i_dont_exist"),),
|
||||||
|
request=self.request)
|
||||||
|
# this should succeed because the default check for
|
||||||
|
# identity is admin_required
|
||||||
|
self.assertTrue(value)
|
||||||
|
|
||||||
|
@override_settings(POLICY_CHECK_FUNCTION=policy_backend.check)
|
||||||
|
def test_compound_check_true(self):
|
||||||
|
policy_backend.reset()
|
||||||
|
value = policy.check((("identity", "admin_required"),
|
||||||
|
("identity", "identity:default"),),
|
||||||
|
request=self.request)
|
||||||
|
self.assertTrue(value)
|
||||||
|
|
||||||
|
@override_settings(POLICY_CHECK_FUNCTION=policy_backend.check)
|
||||||
|
def test_check_nova_context_is_admin_true(self):
|
||||||
|
policy_backend.reset()
|
||||||
|
value = policy.check((("compute", "context_is_admin"),),
|
||||||
|
request=self.request)
|
||||||
|
self.assertTrue(value)
|
Loading…
Reference in New Issue