Forbid regular users to reset admin-only attrs to default values

A regular user can reset an admin-only attribute to its default
value due to the fact that a corresponding policy rule is
enforced only in the case when an attribute is present in the
target AND has a non-default value.

Added a new attribute "attributes_to_update" which contains a list
of all to-be updated attributes to the body of the target that is
passed to policy.enforce.

Changed a check for whether an attribute is explicitly set.
Now, in the case of update, the function should not pay attention
to a default value of an attribute, but check whether it was
explicitly marked as being updated.

Added unit-tests.

Conflicts:
	neutron/common/constants.py

Closes-Bug: #1357379
Related-Bug: #1338880
Change-Id: I6537bb1da5ef0d6899bc71e4e949f2c760c103c2
(cherry picked from commit 74d1093990)
This commit is contained in:
Elena Ezhova 2014-08-19 15:54:36 +04:00 committed by Ihar Hrachyshka
parent f9a7b7f96f
commit dd4b77ff53
5 changed files with 58 additions and 13 deletions

View File

@ -520,6 +520,10 @@ class Controller(object):
parent_id=parent_id) parent_id=parent_id)
orig_object_copy = copy.copy(orig_obj) orig_object_copy = copy.copy(orig_obj)
orig_obj.update(body[self._resource]) orig_obj.update(body[self._resource])
# Make a list of attributes to be updated to inform the policy engine
# which attributes are set explicitly so that it can distinguish them
# from the ones that are set to their default values.
orig_obj[const.ATTRIBUTES_TO_UPDATE] = body[self._resource].keys()
try: try:
policy.enforce(request.context, policy.enforce(request.context,
action, action,

View File

@ -115,3 +115,5 @@ DHCPV6_STATEFUL = 'dhcpv6-stateful'
DHCPV6_STATELESS = 'dhcpv6-stateless' DHCPV6_STATELESS = 'dhcpv6-stateless'
IPV6_SLAAC = 'slaac' IPV6_SLAAC = 'slaac'
IPV6_MODES = [DHCPV6_STATEFUL, DHCPV6_STATELESS, IPV6_SLAAC] IPV6_MODES = [DHCPV6_STATEFUL, DHCPV6_STATELESS, IPV6_SLAAC]
ATTRIBUTES_TO_UPDATE = 'attributes_to_update'

View File

@ -18,12 +18,15 @@
""" """
Policy engine for neutron. Largely copied from nova. Policy engine for neutron. Largely copied from nova.
""" """
import collections
import itertools import itertools
import re import re
from oslo.config import cfg from oslo.config import cfg
from neutron.api.v2 import attributes from neutron.api.v2 import attributes
from neutron.common import constants as const
from neutron.common import exceptions from neutron.common import exceptions
import neutron.common.utils as utils import neutron.common.utils as utils
from neutron import manager from neutron import manager
@ -119,14 +122,28 @@ def _set_rules(data):
policy.set_rules(policies) policy.set_rules(policies)
def _is_attribute_explicitly_set(attribute_name, resource, target): def _is_attribute_explicitly_set(attribute_name, resource, target, action):
"""Verify that an attribute is present and has a non-default value.""" """Verify that an attribute is present and is explicitly set."""
if 'update' in action:
# In the case of update, the function should not pay attention to a
# default value of an attribute, but check whether it was explicitly
# marked as being updated instead.
return (attribute_name in target[const.ATTRIBUTES_TO_UPDATE] and
target[attribute_name] is not attributes.ATTR_NOT_SPECIFIED)
return ('default' in resource[attribute_name] and return ('default' in resource[attribute_name] and
attribute_name in target and attribute_name in target and
target[attribute_name] is not attributes.ATTR_NOT_SPECIFIED and target[attribute_name] is not attributes.ATTR_NOT_SPECIFIED and
target[attribute_name] != resource[attribute_name]['default']) target[attribute_name] != resource[attribute_name]['default'])
def _should_validate_sub_attributes(attribute, sub_attr):
"""Verify that sub-attributes are iterable and should be validated."""
validate = attribute.get('validate')
return (validate and isinstance(sub_attr, collections.Iterable) and
any([k.startswith('type:dict') and
v for (k, v) in validate.iteritems()]))
def _build_subattr_match_rule(attr_name, attr, action, target): def _build_subattr_match_rule(attr_name, attr, action, target):
"""Create the rule to match for sub-attribute policy checks.""" """Create the rule to match for sub-attribute policy checks."""
# TODO(salv-orlando): Instead of relying on validator info, introduce # TODO(salv-orlando): Instead of relying on validator info, introduce
@ -174,16 +191,14 @@ def _build_match_rule(action, target):
for attribute_name in res_map[resource]: for attribute_name in res_map[resource]:
if _is_attribute_explicitly_set(attribute_name, if _is_attribute_explicitly_set(attribute_name,
res_map[resource], res_map[resource],
target): target, action):
attribute = res_map[resource][attribute_name] attribute = res_map[resource][attribute_name]
if 'enforce_policy' in attribute: if 'enforce_policy' in attribute:
attr_rule = policy.RuleCheck('rule', '%s:%s' % attr_rule = policy.RuleCheck('rule', '%s:%s' %
(action, attribute_name)) (action, attribute_name))
# Build match entries for sub-attributes, if present # Build match entries for sub-attributes
validate = attribute.get('validate') if _should_validate_sub_attributes(
if (validate and any([k.startswith('type:dict') and v attribute, target[attribute_name]):
for (k, v) in
validate.iteritems()])):
attr_rule = policy.AndCheck( attr_rule = policy.AndCheck(
[attr_rule, _build_subattr_match_rule( [attr_rule, _build_subattr_match_rule(
attribute_name, attribute, attribute_name, attribute,

View File

@ -1208,6 +1208,18 @@ class SubresourceTest(base.BaseTestCase):
network_id='id1', network_id='id1',
dummy=body) dummy=body)
def test_update_subresource_to_none(self):
instance = self.plugin.return_value
dummy_id = _uuid()
body = {'dummy': {}}
self.api.put_json('/networks/id1' + _get_path('dummies', id=dummy_id),
body)
instance.update_network_dummy.assert_called_once_with(mock.ANY,
dummy_id,
network_id='id1',
dummy=body)
def test_delete_sub_resource(self): def test_delete_sub_resource(self):
instance = self.plugin.return_value instance = self.plugin.return_value

View File

@ -24,6 +24,7 @@ import six
import neutron import neutron
from neutron.api.v2 import attributes from neutron.api.v2 import attributes
from neutron.common import constants as const
from neutron.common import exceptions from neutron.common import exceptions
from neutron import context from neutron import context
from neutron import manager from neutron import manager
@ -284,9 +285,11 @@ class NeutronPolicyTestCase(base.BaseTestCase):
self.addCleanup(self.manager_patcher.stop) self.addCleanup(self.manager_patcher.stop)
def _test_action_on_attr(self, context, action, attr, value, def _test_action_on_attr(self, context, action, attr, value,
exception=None): exception=None, **kwargs):
action = "%s_network" % action action = "%s_network" % action
target = {'tenant_id': 'the_owner', attr: value} target = {'tenant_id': 'the_owner', attr: value}
if kwargs:
target.update(kwargs)
if exception: if exception:
self.assertRaises(exception, policy.enforce, self.assertRaises(exception, policy.enforce,
context, action, target) context, action, target)
@ -295,10 +298,10 @@ class NeutronPolicyTestCase(base.BaseTestCase):
self.assertEqual(result, True) self.assertEqual(result, True)
def _test_nonadmin_action_on_attr(self, action, attr, value, def _test_nonadmin_action_on_attr(self, action, attr, value,
exception=None): exception=None, **kwargs):
user_context = context.Context('', "user", roles=['user']) user_context = context.Context('', "user", roles=['user'])
self._test_action_on_attr(user_context, action, attr, self._test_action_on_attr(user_context, action, attr,
value, exception) value, exception, **kwargs)
def test_nonadmin_write_on_private_fails(self): def test_nonadmin_write_on_private_fails(self):
self._test_nonadmin_action_on_attr('create', 'shared', False, self._test_nonadmin_action_on_attr('create', 'shared', False,
@ -315,9 +318,11 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def test_nonadmin_read_on_shared_succeeds(self): def test_nonadmin_read_on_shared_succeeds(self):
self._test_nonadmin_action_on_attr('get', 'shared', True) self._test_nonadmin_action_on_attr('get', 'shared', True)
def _test_enforce_adminonly_attribute(self, action): def _test_enforce_adminonly_attribute(self, action, **kwargs):
admin_context = context.get_admin_context() admin_context = context.get_admin_context()
target = {'shared': True} target = {'shared': True}
if kwargs:
target.update(kwargs)
result = policy.enforce(admin_context, action, target) result = policy.enforce(admin_context, action, target)
self.assertEqual(result, True) self.assertEqual(result, True)
@ -325,7 +330,14 @@ class NeutronPolicyTestCase(base.BaseTestCase):
self._test_enforce_adminonly_attribute('create_network') self._test_enforce_adminonly_attribute('create_network')
def test_enforce_adminonly_attribute_update(self): def test_enforce_adminonly_attribute_update(self):
self._test_enforce_adminonly_attribute('update_network') kwargs = {const.ATTRIBUTES_TO_UPDATE: ['shared']}
self._test_enforce_adminonly_attribute('update_network', **kwargs)
def test_reset_adminonly_attr_to_default_fails(self):
kwargs = {const.ATTRIBUTES_TO_UPDATE: ['shared']}
self._test_nonadmin_action_on_attr('update', 'shared', False,
exceptions.PolicyNotAuthorized,
**kwargs)
def test_enforce_adminonly_attribute_no_context_is_admin_policy(self): def test_enforce_adminonly_attribute_no_context_is_admin_policy(self):
del self.rules[policy.ADMIN_CTX_POLICY] del self.rules[policy.ADMIN_CTX_POLICY]