Enable policy control over external_gateway_info sub-attributes

Part 2 of blueprint l3-ext-gw-modes

This patch extends the logic for building policy rule matches in order to
include sub-attributes as well. This logic will be leveraged by the
ext-gw-mode api extension.

Change-Id: I7f46a395597b71bb1c5110aa4e792a04a5010d4c
This commit is contained in:
Salvatore Orlando 2013-05-13 13:12:46 +02:00
parent b99b020b29
commit 7ce9bc96ab
3 changed files with 105 additions and 2 deletions

View File

@ -55,6 +55,9 @@
"update_port:mac_learning_enabled": "rule:admin_or_network_owner", "update_port:mac_learning_enabled": "rule:admin_or_network_owner",
"delete_port": "rule:admin_or_owner", "delete_port": "rule:admin_or_owner",
"create_router:external_gateway_info:enable_snat": "rule:admin_only",
"update_router:external_gateway_info:enable_snat": "rule:admin_only",
"create_service_type": "rule:admin_only", "create_service_type": "rule:admin_only",
"update_service_type": "rule:admin_only", "update_service_type": "rule:admin_only",
"delete_service_type": "rule:admin_only", "delete_service_type": "rule:admin_only",

View File

@ -126,6 +126,31 @@ def _is_attribute_explicitly_set(attribute_name, resource, target):
target[attribute_name] != resource[attribute_name]['default']) target[attribute_name] != resource[attribute_name]['default'])
def _build_subattr_match_rule(attr_name, attr, action, target):
"""Create the rule to match for sub-attribute policy checks."""
# TODO(salv-orlando): Instead of relying on validator info, introduce
# typing for API attributes
# Expect a dict as type descriptor
validate = attr['validate']
key = filter(lambda k: k.startswith('type:dict'), validate.keys())
if not key:
LOG.warn(_("Unable to find data type descriptor for attribute %s"),
attr_name)
return
data = validate[key[0]]
if not isinstance(data, dict):
LOG.debug(_("Attribute type descriptor is not a dict. Unable to "
"generate any sub-attr policy rule for %s."),
attr_name)
return
sub_attr_rules = [policy.RuleCheck('rule', '%s:%s:%s' %
(action, attr_name,
sub_attr_name)) for
sub_attr_name in data if sub_attr_name in
target[attr_name]]
return policy.AndCheck(sub_attr_rules)
def _build_match_rule(action, target): def _build_match_rule(action, target):
"""Create the rule to match for a given action. """Create the rule to match for a given action.
@ -134,7 +159,9 @@ def _build_match_rule(action, target):
2) add an entry for the specific action (e.g.: create_network) 2) add an entry for the specific action (e.g.: create_network)
3) add an entry for attributes of a resource for which the action 3) add an entry for attributes of a resource for which the action
is being executed (e.g.: create_network:shared) is being executed (e.g.: create_network:shared)
4) add an entry for sub-attributes of a resource for which the
action is being executed
(e.g.: create_router:external_gateway_info:network_id)
""" """
match_rule = policy.RuleCheck('rule', action) match_rule = policy.RuleCheck('rule', action)
@ -152,8 +179,16 @@ def _build_match_rule(action, target):
if 'enforce_policy' in attribute: if 'enforce_policy' in attribute:
attr_rule = policy.RuleCheck('rule', '%s:%s' % attr_rule = policy.RuleCheck('rule', '%s:%s' %
(action, attribute_name)) (action, attribute_name))
# Build match entries for sub-attributes, if present
validate = attribute.get('validate')
if (validate and any([k.startswith('type:dict') and v
for (k, v) in
validate.iteritems()])):
attr_rule = policy.AndCheck(
[attr_rule, _build_subattr_match_rule(
attribute_name, attribute,
action, target)])
match_rule = policy.AndCheck([match_rule, attr_rule]) match_rule = policy.AndCheck([match_rule, attr_rule])
return match_rule return match_rule

View File

@ -23,6 +23,7 @@ import fixtures
import mock import mock
import neutron import neutron
from neutron.api.v2 import attributes
from neutron.common import exceptions from neutron.common import exceptions
from neutron import context from neutron import context
from neutron import manager from neutron import manager
@ -201,6 +202,19 @@ class DefaultPolicyTestCase(base.BaseTestCase):
self.context, "example:noexist", {}) self.context, "example:noexist", {})
FAKE_RESOURCE_NAME = 'something'
FAKE_RESOURCE = {"%ss" % FAKE_RESOURCE_NAME:
{'attr': {'allow_post': True,
'allow_put': True,
'is_visible': True,
'default': None,
'enforce_policy': True,
'validate': {'type:dict':
{'sub_attr_1': {'type:string': None},
'sub_attr_2': {'type:string': None}}}
}}}
class NeutronPolicyTestCase(base.BaseTestCase): class NeutronPolicyTestCase(base.BaseTestCase):
def setUp(self): def setUp(self):
@ -210,6 +224,8 @@ class NeutronPolicyTestCase(base.BaseTestCase):
self.addCleanup(policy.reset) self.addCleanup(policy.reset)
self.admin_only_legacy = "role:admin" self.admin_only_legacy = "role:admin"
self.admin_or_owner_legacy = "role:admin or tenant_id:%(tenant_id)s" self.admin_or_owner_legacy = "role:admin or tenant_id:%(tenant_id)s"
# Add a Fake 'something' resource to RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP.update(FAKE_RESOURCE)
self.rules = dict((k, common_policy.parse_rule(v)) for k, v in { self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
"context_is_admin": "role:admin", "context_is_admin": "role:admin",
"admin_or_network_owner": "rule:context_is_admin or " "admin_or_network_owner": "rule:context_is_admin or "
@ -231,16 +247,24 @@ class NeutronPolicyTestCase(base.BaseTestCase):
"rule:shared or " "rule:shared or "
"rule:external", "rule:external",
"create_port:mac": "rule:admin_or_network_owner", "create_port:mac": "rule:admin_or_network_owner",
"create_something": "rule:admin_or_owner",
"create_something:attr": "rule:admin_or_owner",
"create_something:attr:sub_attr_1": "rule:admin_or_owner",
"create_something:attr:sub_attr_2": "rule:admin_only"
}.items()) }.items())
def fakepolicyinit(): def fakepolicyinit():
common_policy.set_rules(common_policy.Rules(self.rules)) common_policy.set_rules(common_policy.Rules(self.rules))
def remove_fake_resource():
del attributes.RESOURCE_ATTRIBUTE_MAP["%ss" % FAKE_RESOURCE_NAME]
self.patcher = mock.patch.object(neutron.policy, self.patcher = mock.patch.object(neutron.policy,
'init', 'init',
new=fakepolicyinit) new=fakepolicyinit)
self.patcher.start() self.patcher.start()
self.addCleanup(self.patcher.stop) self.addCleanup(self.patcher.stop)
self.addCleanup(remove_fake_resource)
self.context = context.Context('fake', 'fake', roles=['user']) self.context = context.Context('fake', 'fake', roles=['user'])
plugin_klass = importutils.import_class( plugin_klass = importutils.import_class(
"neutron.db.db_base_plugin_v2.NeutronDbPluginV2") "neutron.db.db_base_plugin_v2.NeutronDbPluginV2")
@ -319,6 +343,47 @@ class NeutronPolicyTestCase(base.BaseTestCase):
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce, self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.context, action, target) self.context, action, target)
def _test_build_subattribute_match_rule(self, validate_value):
bk = FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr']['validate']
FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr']['validate'] = (
validate_value)
action = "create_something"
target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x'}}
self.assertFalse(policy._build_subattr_match_rule(
'attr',
FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr'],
action,
target))
FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr']['validate'] = bk
def test_build_subattribute_match_rule_empty_dict_validator(self):
self._test_build_subattribute_match_rule({})
def test_build_subattribute_match_rule_wrong_validation_info(self):
self._test_build_subattribute_match_rule(
{'type:dict': 'wrong_stuff'})
def test_enforce_subattribute(self):
action = "create_something"
target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x'}}
result = policy.enforce(self.context, action, target, None)
self.assertEqual(result, True)
def test_enforce_admin_only_subattribute(self):
action = "create_something"
target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x',
'sub_attr_2': 'y'}}
result = policy.enforce(context.get_admin_context(),
action, target, None)
self.assertEqual(result, True)
def test_enforce_admin_only_subattribute_nonadminctx_returns_403(self):
action = "create_something"
target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x',
'sub_attr_2': 'y'}}
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.context, action, target, None)
def test_enforce_regularuser_on_read(self): def test_enforce_regularuser_on_read(self):
action = "get_network" action = "get_network"
target = {'shared': True, 'tenant_id': 'somebody_else'} target = {'shared': True, 'tenant_id': 'somebody_else'}