Use policy based rule to define context.is_admin

When creating a context set is_admin based on policy instead of
hard coded requirement for a role named 'admin'.

Fixes: bug# 1053181

Change-Id: I9c05b8ecde887467557c4f01e074145b6a6372aa
This commit is contained in:
Clay Gerrard 2012-10-01 11:33:25 -05:00
parent 4cfb90a9b8
commit bdee787cf7
5 changed files with 68 additions and 5 deletions

View File

@ -24,6 +24,7 @@ import copy
from cinder.openstack.common import log as logging
from cinder.openstack.common import local
from cinder.openstack.common import timeutils
from cinder import policy
from cinder import utils
@ -65,7 +66,7 @@ class RequestContext(object):
self.roles = roles or []
self.is_admin = is_admin
if self.is_admin is None:
self.is_admin = 'admin' in [x.lower() for x in self.roles]
self.is_admin = policy.check_is_admin(self.roles)
elif self.is_admin and 'admin' not in self.roles:
self.roles.append('admin')
self.read_deleted = read_deleted

View File

@ -86,3 +86,21 @@ def enforce(context, action, target):
policy.enforce(match_list, target, credentials,
exception.PolicyNotAuthorized, action=action)
def check_is_admin(roles):
"""Whether or not roles contains 'admin' role according to policy setting.
"""
init()
action = 'context_is_admin'
match_list = ('rule:%s' % action,)
# include project_id on target to avoid KeyError if context_is_admin
# policy definition is missing, and default admin_or_owner rule
# attempts to apply. Since our credentials dict does not include a
# project_id, this target can never match as a generic rule.
target = {'project_id': ''}
credentials = {'roles': roles}
return policy.enforce(match_list, target, credentials)

View File

@ -1,5 +1,6 @@
{
"admin_api": [["role:admin"]],
"context_is_admin": [["role:admin"]],
"admin_api": [["is_admin:True"]],
"volume:create": [],
"volume:get": [],

View File

@ -36,8 +36,9 @@ FLAGS = flags.FLAGS
class PolicyFileTestCase(test.TestCase):
def setUp(self):
super(PolicyFileTestCase, self).setUp()
policy.reset()
# since is_admin is defined by policy, create context before reset
self.context = context.RequestContext('fake', 'fake')
policy.reset()
self.target = {}
def tearDown(self):
@ -188,3 +189,44 @@ class DefaultPolicyTestCase(test.TestCase):
self._set_brain("default_noexist")
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
self.context, "example:noexist", {})
class ContextIsAdminPolicyTestCase(test.TestCase):
def setUp(self):
super(ContextIsAdminPolicyTestCase, self).setUp()
policy.reset()
policy.init()
def test_default_admin_role_is_admin(self):
ctx = context.RequestContext('fake', 'fake', roles=['johnny-admin'])
self.assertFalse(ctx.is_admin)
ctx = context.RequestContext('fake', 'fake', roles=['admin'])
self.assert_(ctx.is_admin)
def test_custom_admin_role_is_admin(self):
# define explict rules for context_is_admin
rules = {
'context_is_admin': [["role:administrator"], ["role:johnny-admin"]]
}
brain = common_policy.Brain(rules, FLAGS.policy_default_rule)
common_policy.set_brain(brain)
ctx = context.RequestContext('fake', 'fake', roles=['johnny-admin'])
self.assert_(ctx.is_admin)
ctx = context.RequestContext('fake', 'fake', roles=['administrator'])
self.assert_(ctx.is_admin)
# default rule no longer applies
ctx = context.RequestContext('fake', 'fake', roles=['admin'])
self.assertFalse(ctx.is_admin)
def test_context_is_admin_undefined(self):
rules = {
"admin_or_owner": [["role:admin"], ["project_id:%(project_id)s"]],
"default": [["rule:admin_or_owner"]],
}
brain = common_policy.Brain(rules, FLAGS.policy_default_rule)
common_policy.set_brain(brain)
ctx = context.RequestContext('fake', 'fake')
self.assertFalse(ctx.is_admin)
ctx = context.RequestContext('fake', 'fake', roles=['admin'])
self.assert_(ctx.is_admin)

View File

@ -1,8 +1,9 @@
{
"admin_or_owner": [["role:admin"], ["project_id:%(project_id)s"]],
"context_is_admin": [["role:admin"]],
"admin_or_owner": [["is_admin:True"], ["project_id:%(project_id)s"]],
"default": [["rule:admin_or_owner"]],
"admin_api": [["role:admin"]],
"admin_api": [["is_admin:True"]],
"volume:create": [],
"volume:get_all": [],