Merge "Allow policies for resource_types with wildcard"
This commit is contained in:
commit
6cd1da5224
|
@ -94,8 +94,7 @@ class ResourceEnforcer(Enforcer):
|
||||||
super(ResourceEnforcer, self).__init__(
|
super(ResourceEnforcer, self).__init__(
|
||||||
default_rule=default_rule, **kwargs)
|
default_rule=default_rule, **kwargs)
|
||||||
|
|
||||||
def enforce(self, context, res_type, scope=None, target=None):
|
def _enforce(self, context, res_type, scope=None, target=None):
|
||||||
# NOTE(pas-ha): try/except just to log the exception
|
|
||||||
try:
|
try:
|
||||||
result = super(ResourceEnforcer, self).enforce(
|
result = super(ResourceEnforcer, self).enforce(
|
||||||
context, res_type,
|
context, res_type,
|
||||||
|
@ -107,8 +106,20 @@ class ResourceEnforcer(Enforcer):
|
||||||
if not result:
|
if not result:
|
||||||
if self.exc:
|
if self.exc:
|
||||||
raise self.exc(action=res_type)
|
raise self.exc(action=res_type)
|
||||||
else:
|
return result
|
||||||
return result
|
|
||||||
|
def enforce(self, context, res_type, scope=None, target=None):
|
||||||
|
# NOTE(pas-ha): try/except just to log the exception
|
||||||
|
result = self._enforce(context, res_type, scope, target)
|
||||||
|
|
||||||
|
if result:
|
||||||
|
# check for wildcard resource types
|
||||||
|
subparts = res_type.split("::")[:-1]
|
||||||
|
subparts.append('*')
|
||||||
|
res_type_wc = "::".join(subparts)
|
||||||
|
return self._enforce(context, res_type_wc, scope, target)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
def enforce_stack(self, stack, scope=None, target=None):
|
def enforce_stack(self, stack, scope=None, target=None):
|
||||||
for res in stack.resources.values():
|
for res in stack.resources.values():
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
{
|
{
|
||||||
"context_is_admin": "role:admin",
|
"context_is_admin": "role:admin",
|
||||||
|
|
||||||
"resource_types:OS::Test::AdminOnly": "rule:context_is_admin"
|
"resource_types:OS::Test::AdminOnly": "rule:context_is_admin",
|
||||||
|
"resource_types:OS::Keystone::*": "rule:context_is_admin"
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -186,14 +186,14 @@ class TestPolicyEnforcer(common.HeatTestCase):
|
||||||
enforcer = policy.ResourceEnforcer(
|
enforcer = policy.ResourceEnforcer(
|
||||||
policy_file=self.get_policy_file('resources.json'))
|
policy_file=self.get_policy_file('resources.json'))
|
||||||
res_type = "OS::Test::NotInPolicy"
|
res_type = "OS::Test::NotInPolicy"
|
||||||
self.assertIsNone(enforcer.enforce(context, res_type))
|
self.assertTrue(enforcer.enforce(context, res_type))
|
||||||
|
|
||||||
def test_resource_enforce_success(self):
|
def test_resource_enforce_success(self):
|
||||||
context = utils.dummy_context(roles=['admin'])
|
context = utils.dummy_context(roles=['admin'])
|
||||||
enforcer = policy.ResourceEnforcer(
|
enforcer = policy.ResourceEnforcer(
|
||||||
policy_file=self.get_policy_file('resources.json'))
|
policy_file=self.get_policy_file('resources.json'))
|
||||||
res_type = "OS::Test::AdminOnly"
|
res_type = "OS::Test::AdminOnly"
|
||||||
self.assertIsNone(enforcer.enforce(context, res_type))
|
self.assertTrue(enforcer.enforce(context, res_type))
|
||||||
|
|
||||||
def test_resource_enforce_fail(self):
|
def test_resource_enforce_fail(self):
|
||||||
context = utils.dummy_context(roles=['non-admin'])
|
context = utils.dummy_context(roles=['non-admin'])
|
||||||
|
@ -205,6 +205,16 @@ class TestPolicyEnforcer(common.HeatTestCase):
|
||||||
context, res_type)
|
context, res_type)
|
||||||
self.assertIn(res_type, ex.message)
|
self.assertIn(res_type, ex.message)
|
||||||
|
|
||||||
|
def test_resource_wildcard_enforce_fail(self):
|
||||||
|
context = utils.dummy_context(roles=['non-admin'])
|
||||||
|
enforcer = policy.ResourceEnforcer(
|
||||||
|
policy_file=self.get_policy_file('resources.json'))
|
||||||
|
res_type = "OS::Keystone::User"
|
||||||
|
ex = self.assertRaises(exception.Forbidden,
|
||||||
|
enforcer.enforce,
|
||||||
|
context, res_type)
|
||||||
|
self.assertIn(res_type.split("::", 1)[0], ex.message)
|
||||||
|
|
||||||
def test_resource_enforce_returns_false(self):
|
def test_resource_enforce_returns_false(self):
|
||||||
context = utils.dummy_context(roles=['non-admin'])
|
context = utils.dummy_context(roles=['non-admin'])
|
||||||
enforcer = policy.ResourceEnforcer(
|
enforcer = policy.ResourceEnforcer(
|
||||||
|
@ -212,6 +222,7 @@ class TestPolicyEnforcer(common.HeatTestCase):
|
||||||
exc=None)
|
exc=None)
|
||||||
res_type = "OS::Test::AdminOnly"
|
res_type = "OS::Test::AdminOnly"
|
||||||
self.assertFalse(enforcer.enforce(context, res_type))
|
self.assertFalse(enforcer.enforce(context, res_type))
|
||||||
|
self.assertIsNotNone(enforcer.enforce(context, res_type))
|
||||||
|
|
||||||
def test_resource_enforce_exc_on_false(self):
|
def test_resource_enforce_exc_on_false(self):
|
||||||
context = utils.dummy_context(roles=['non-admin'])
|
context = utils.dummy_context(roles=['non-admin'])
|
||||||
|
|
Loading…
Reference in New Issue