diff --git a/etc/policy.json b/etc/policy.json index 62a90b946..b85384e99 100644 --- a/etc/policy.json +++ b/etc/policy.json @@ -55,6 +55,9 @@ "update_port:mac_learning_enabled": "rule:admin_or_network_owner", "delete_port": "rule:admin_or_owner", + "create_router:external_gateway_info:enable_snat": "rule:admin_only", + "update_router:external_gateway_info:enable_snat": "rule:admin_only", + "create_service_type": "rule:admin_only", "update_service_type": "rule:admin_only", "delete_service_type": "rule:admin_only", diff --git a/neutron/policy.py b/neutron/policy.py index 28d8f60e9..f22894de1 100644 --- a/neutron/policy.py +++ b/neutron/policy.py @@ -126,6 +126,31 @@ def _is_attribute_explicitly_set(attribute_name, resource, target): target[attribute_name] != resource[attribute_name]['default']) +def _build_subattr_match_rule(attr_name, attr, action, target): + """Create the rule to match for sub-attribute policy checks.""" + # TODO(salv-orlando): Instead of relying on validator info, introduce + # typing for API attributes + # Expect a dict as type descriptor + validate = attr['validate'] + key = filter(lambda k: k.startswith('type:dict'), validate.keys()) + if not key: + LOG.warn(_("Unable to find data type descriptor for attribute %s"), + attr_name) + return + data = validate[key[0]] + if not isinstance(data, dict): + LOG.debug(_("Attribute type descriptor is not a dict. Unable to " + "generate any sub-attr policy rule for %s."), + attr_name) + return + sub_attr_rules = [policy.RuleCheck('rule', '%s:%s:%s' % + (action, attr_name, + sub_attr_name)) for + sub_attr_name in data if sub_attr_name in + target[attr_name]] + return policy.AndCheck(sub_attr_rules) + + def _build_match_rule(action, target): """Create the rule to match for a given action. @@ -134,7 +159,9 @@ def _build_match_rule(action, target): 2) add an entry for the specific action (e.g.: create_network) 3) add an entry for attributes of a resource for which the action is being executed (e.g.: create_network:shared) - + 4) add an entry for sub-attributes of a resource for which the + action is being executed + (e.g.: create_router:external_gateway_info:network_id) """ match_rule = policy.RuleCheck('rule', action) @@ -152,8 +179,16 @@ def _build_match_rule(action, target): if 'enforce_policy' in attribute: attr_rule = policy.RuleCheck('rule', '%s:%s' % (action, attribute_name)) + # Build match entries for sub-attributes, if present + validate = attribute.get('validate') + if (validate and any([k.startswith('type:dict') and v + for (k, v) in + validate.iteritems()])): + attr_rule = policy.AndCheck( + [attr_rule, _build_subattr_match_rule( + attribute_name, attribute, + action, target)]) match_rule = policy.AndCheck([match_rule, attr_rule]) - return match_rule diff --git a/neutron/tests/unit/test_policy.py b/neutron/tests/unit/test_policy.py index 76f1c79ba..d602cd93c 100644 --- a/neutron/tests/unit/test_policy.py +++ b/neutron/tests/unit/test_policy.py @@ -23,6 +23,7 @@ import fixtures import mock import neutron +from neutron.api.v2 import attributes from neutron.common import exceptions from neutron import context from neutron import manager @@ -201,6 +202,19 @@ class DefaultPolicyTestCase(base.BaseTestCase): self.context, "example:noexist", {}) +FAKE_RESOURCE_NAME = 'something' +FAKE_RESOURCE = {"%ss" % FAKE_RESOURCE_NAME: + {'attr': {'allow_post': True, + 'allow_put': True, + 'is_visible': True, + 'default': None, + 'enforce_policy': True, + 'validate': {'type:dict': + {'sub_attr_1': {'type:string': None}, + 'sub_attr_2': {'type:string': None}}} + }}} + + class NeutronPolicyTestCase(base.BaseTestCase): def setUp(self): @@ -210,6 +224,8 @@ class NeutronPolicyTestCase(base.BaseTestCase): self.addCleanup(policy.reset) self.admin_only_legacy = "role:admin" self.admin_or_owner_legacy = "role:admin or tenant_id:%(tenant_id)s" + # Add a Fake 'something' resource to RESOURCE_ATTRIBUTE_MAP + attributes.RESOURCE_ATTRIBUTE_MAP.update(FAKE_RESOURCE) self.rules = dict((k, common_policy.parse_rule(v)) for k, v in { "context_is_admin": "role:admin", "admin_or_network_owner": "rule:context_is_admin or " @@ -231,16 +247,24 @@ class NeutronPolicyTestCase(base.BaseTestCase): "rule:shared or " "rule:external", "create_port:mac": "rule:admin_or_network_owner", + "create_something": "rule:admin_or_owner", + "create_something:attr": "rule:admin_or_owner", + "create_something:attr:sub_attr_1": "rule:admin_or_owner", + "create_something:attr:sub_attr_2": "rule:admin_only" }.items()) def fakepolicyinit(): common_policy.set_rules(common_policy.Rules(self.rules)) + def remove_fake_resource(): + del attributes.RESOURCE_ATTRIBUTE_MAP["%ss" % FAKE_RESOURCE_NAME] + self.patcher = mock.patch.object(neutron.policy, 'init', new=fakepolicyinit) self.patcher.start() self.addCleanup(self.patcher.stop) + self.addCleanup(remove_fake_resource) self.context = context.Context('fake', 'fake', roles=['user']) plugin_klass = importutils.import_class( "neutron.db.db_base_plugin_v2.NeutronDbPluginV2") @@ -319,6 +343,47 @@ class NeutronPolicyTestCase(base.BaseTestCase): self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce, self.context, action, target) + def _test_build_subattribute_match_rule(self, validate_value): + bk = FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr']['validate'] + FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr']['validate'] = ( + validate_value) + action = "create_something" + target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x'}} + self.assertFalse(policy._build_subattr_match_rule( + 'attr', + FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr'], + action, + target)) + FAKE_RESOURCE['%ss' % FAKE_RESOURCE_NAME]['attr']['validate'] = bk + + def test_build_subattribute_match_rule_empty_dict_validator(self): + self._test_build_subattribute_match_rule({}) + + def test_build_subattribute_match_rule_wrong_validation_info(self): + self._test_build_subattribute_match_rule( + {'type:dict': 'wrong_stuff'}) + + def test_enforce_subattribute(self): + action = "create_something" + target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x'}} + result = policy.enforce(self.context, action, target, None) + self.assertEqual(result, True) + + def test_enforce_admin_only_subattribute(self): + action = "create_something" + target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x', + 'sub_attr_2': 'y'}} + result = policy.enforce(context.get_admin_context(), + action, target, None) + self.assertEqual(result, True) + + def test_enforce_admin_only_subattribute_nonadminctx_returns_403(self): + action = "create_something" + target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x', + 'sub_attr_2': 'y'}} + self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce, + self.context, action, target, None) + def test_enforce_regularuser_on_read(self): action = "get_network" target = {'shared': True, 'tenant_id': 'somebody_else'}