Make the 'admin' role configurable

Bug 1158434

This patch adds a new policy named 'context_is_admin' which defines
an admin user as a collection of roles or else. The quantum context
has been updated to check for this policy when setting the is_admin
flag.
This patch also adds a method for gathering 'admin' roles from policy
rules as current logic requires the context to be always populate with
the correct roles for admin rules, even when the context is implicitly
generated with get_admin_context or context.elevated.
Backward compatibility is ensuring by preserving the old behavior if
the 'context_is_admin' policy is not found in policy.json

Change-Id: I9acea75cca0c47e083a9149e358328ea3ca12d68
This commit is contained in:
Salvatore Orlando 2013-04-22 09:44:14 +02:00
parent 765baf8532
commit 35988f1393
4 changed files with 127 additions and 12 deletions

View File

@ -1,7 +1,8 @@
{
"admin_or_owner": "role:admin or tenant_id:%(tenant_id)s",
"admin_or_network_owner": "role:admin or tenant_id:%(network_tenant_id)s",
"admin_only": "role:admin",
"context_is_admin": "role:admin",
"admin_or_owner": "rule:context_is_admin or tenant_id:%(tenant_id)s",
"admin_or_network_owner": "rule:context_is_admin or tenant_id:%(network_tenant_id)s",
"admin_only": "rule:context_is_admin",
"regular_user": "",
"shared": "field:networks:shared=True",
"external": "field:networks:router:external=True",

View File

@ -24,6 +24,7 @@ from datetime import datetime
from quantum.db import api as db_api
from quantum.openstack.common import context as common_context
from quantum.openstack.common import log as logging
from quantum import policy
LOG = logging.getLogger(__name__)
@ -48,16 +49,22 @@ class ContextBase(common_context.RequestContext):
'context: %s'), kwargs)
super(ContextBase, self).__init__(user=user_id, tenant=tenant_id,
is_admin=is_admin)
self.roles = roles or []
if self.is_admin is None:
self.is_admin = 'admin' in [x.lower() for x in self.roles]
elif self.is_admin and 'admin' not in [x.lower() for x in self.roles]:
self.roles.append('admin')
self.read_deleted = read_deleted
if not timestamp:
timestamp = datetime.utcnow()
self.timestamp = timestamp
self._session = None
self.roles = roles or []
if self.is_admin is None:
self.is_admin = policy.check_is_admin(self)
elif self.is_admin:
# Ensure context is populated with admin roles
# TODO(salvatore-orlando): It should not be necessary
# to populate roles in artificially-generated contexts
# address in bp/make-authz-orthogonal
admin_roles = policy.get_admin_roles()
if admin_roles:
self.roles = list(set(self.roles) | set(admin_roles))
@property
def project_id(self):

View File

@ -31,6 +31,7 @@ from quantum.openstack.common import policy
LOG = logging.getLogger(__name__)
_POLICY_PATH = None
_POLICY_CACHE = {}
ADMIN_CTX_POLICY = 'context_is_admin'
cfg.CONF.import_opt('policy_file', 'quantum.common.config')
@ -207,3 +208,48 @@ def enforce(context, action, target, plugin=None):
credentials = context.to_dict()
return policy.check(match_rule, real_target, credentials,
exceptions.PolicyNotAuthorized, action=action)
def check_is_admin(context):
"""Verify context has admin rights according to policy settings."""
init()
# the target is user-self
credentials = context.to_dict()
target = credentials
# Backward compatibility: if ADMIN_CTX_POLICY is not
# found, default to validating role:admin
admin_policy = (ADMIN_CTX_POLICY in policy._rules
and ADMIN_CTX_POLICY or 'role:admin')
return policy.check(admin_policy, target, credentials)
def _extract_roles(rule, roles):
if isinstance(rule, policy.RoleCheck):
roles.append(rule.match.lower())
elif isinstance(rule, policy.RuleCheck):
_extract_roles(policy._rules[rule.match], roles)
elif hasattr(rule, 'rules'):
for rule in rule.rules:
_extract_roles(rule, roles)
def get_admin_roles():
"""Return a list of roles which are granted admin rights according
to policy settings.
"""
# NOTE(salvatore-orlando): This function provides a solution for
# populating implicit contexts with the appropriate roles so that
# they correctly pass policy checks, and will become superseded
# once all explicit policy checks are removed from db logic and
# plugin modules. For backward compatibility it returns the literal
# admin if ADMIN_CTX_POLICY is not defined
init()
if not policy._rules or ADMIN_CTX_POLICY not in policy._rules:
return ['admin']
try:
admin_ctx_rule = policy._rules[ADMIN_CTX_POLICY]
except (KeyError, TypeError):
return
roles = []
_extract_roles(admin_ctx_rule, roles)
return roles

View File

@ -35,7 +35,7 @@ class PolicyFileTestCase(base.BaseTestCase):
super(PolicyFileTestCase, self).setUp()
policy.reset()
self.addCleanup(policy.reset)
self.context = context.Context('fake', 'fake')
self.context = context.Context('fake', 'fake', is_admin=False)
self.target = {}
self.tempdir = self.useFixture(fixtures.TempDir())
@ -200,11 +200,15 @@ class QuantumPolicyTestCase(base.BaseTestCase):
policy.reset()
policy.init()
self.addCleanup(policy.reset)
self.admin_only_legacy = "role:admin"
self.admin_or_owner_legacy = "role:admin or tenant_id:%(tenant_id)s"
self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
"admin_or_network_owner": "role:admin or "
"context_is_admin": "role:admin",
"admin_or_network_owner": "rule:context_is_admin or "
"tenant_id:%(network_tenant_id)s",
"admin_or_owner": "role:admin or tenant_id:%(tenant_id)s",
"admin_only": "role:admin",
"admin_or_owner": ("rule:context_is_admin or "
"tenant_id:%(tenant_id)s"),
"admin_only": "rule:context_is_admin",
"regular_user": "role:user",
"shared": "field:networks:shared=True",
"external": "field:networks:router:external=True",
@ -278,12 +282,31 @@ class QuantumPolicyTestCase(base.BaseTestCase):
def test_enforce_adminonly_attribute_update(self):
self._test_enforce_adminonly_attribute('update_network')
def test_enforce_adminonly_attribute_no_context_is_admin_policy(self):
del self.rules[policy.ADMIN_CTX_POLICY]
self.rules['admin_only'] = common_policy.parse_rule(
self.admin_only_legacy)
self.rules['admin_or_owner'] = common_policy.parse_rule(
self.admin_or_owner_legacy)
self._test_enforce_adminonly_attribute('create_network')
def test_enforce_adminonly_attribute_nonadminctx_returns_403(self):
action = "create_network"
target = {'shared': True, 'tenant_id': 'somebody_else'}
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.context, action, target, None)
def test_enforce_adminonly_nonadminctx_no_ctx_is_admin_policy_403(self):
del self.rules[policy.ADMIN_CTX_POLICY]
self.rules['admin_only'] = common_policy.parse_rule(
self.admin_only_legacy)
self.rules['admin_or_owner'] = common_policy.parse_rule(
self.admin_or_owner_legacy)
action = "create_network"
target = {'shared': True, 'tenant_id': 'somebody_else'}
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.context, action, target, None)
def test_enforce_regularuser_on_read(self):
action = "get_network"
target = {'shared': True, 'tenant_id': 'somebody_else'}
@ -300,3 +323,41 @@ class QuantumPolicyTestCase(base.BaseTestCase):
target = {'network_id': 'whatever'}
result = policy.enforce(self.context, action, target, self.plugin)
self.assertTrue(result)
def test_get_roles_context_is_admin_rule_missing(self):
rules = dict((k, common_policy.parse_rule(v)) for k, v in {
"some_other_rule": "role:admin",
}.items())
common_policy.set_rules(common_policy.Rules(rules))
# 'admin' role is expected for bw compatibility
self.assertEqual(['admin'], policy.get_admin_roles())
def test_get_roles_with_role_check(self):
rules = dict((k, common_policy.parse_rule(v)) for k, v in {
policy.ADMIN_CTX_POLICY: "role:admin",
}.items())
common_policy.set_rules(common_policy.Rules(rules))
self.assertEqual(['admin'], policy.get_admin_roles())
def test_get_roles_with_rule_check(self):
rules = dict((k, common_policy.parse_rule(v)) for k, v in {
policy.ADMIN_CTX_POLICY: "rule:some_other_rule",
"some_other_rule": "role:admin",
}.items())
common_policy.set_rules(common_policy.Rules(rules))
self.assertEqual(['admin'], policy.get_admin_roles())
def test_get_roles_with_or_check(self):
self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
policy.ADMIN_CTX_POLICY: "rule:rule1 or rule:rule2",
"rule1": "role:admin_1",
"rule2": "role:admin_2"
}.items())
self.assertEqual(['admin_1', 'admin_2'],
policy.get_admin_roles())
def test_get_roles_with_other_rules(self):
self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
policy.ADMIN_CTX_POLICY: "role:xxx or other:value",
}.items())
self.assertEqual(['xxx'], policy.get_admin_roles())