Merge "Fix handling of network:shared field in policy module"

This commit is contained in:
Zuul 2019-05-09 00:09:33 +00:00 committed by Gerrit Code Review
commit d0e64c6183
2 changed files with 88 additions and 14 deletions
neutron

@ -353,21 +353,43 @@ class FieldCheck(policy.Check):
conv_func = lambda x: x
self.field = field
self.resource = resource
self.value = conv_func(value)
self.regex = re.compile(value[1:]) if value.startswith('~') else None
def __call__(self, target_dict, cred_dict, enforcer):
target_value = target_dict.get(self.field)
target_value = self._get_target_value(target_dict)
# target_value might be a boolean, explicitly compare with None
if target_value is None:
LOG.debug("Unable to find requested field: %(field)s in target: "
"%(target_dict)s",
{'field': self.field, 'target_dict': target_dict})
return False
if self.regex:
return bool(self.regex.match(target_value))
return target_value == self.value
def _get_target_value(self, target_dict):
if self.field in target_dict:
return target_dict[self.field]
# NOTE(slaweq): In case that target field is "networks:shared" we need
# to treat it in "special" way as it may be used for resources other
# than network, e.g. for port or subnet
target_value = None
if self.resource == "networks" and self.field == constants.SHARED:
target_network_id = target_dict.get("network_id")
if not target_network_id:
LOG.debug("Unable to find network_id field in target: "
"%(target_dict)s",
{'field': self.field, 'target_dict': target_dict})
return
plugin = directory.get_plugin()
network = plugin.get_network(
context.get_admin_context(), target_network_id)
target_value = network.get(self.field)
if target_value is None:
LOG.debug("Unable to find requested field: %(field)s in target: "
"%(target_dict)s",
{'field': self.field, 'target_dict': target_dict})
return target_value
def _prepare_check(context, action, target, pluralized):
"""Prepare rule, target, and credentials for the policy engine."""

@ -28,11 +28,14 @@ from oslo_policy import fixture as op_fixture
from oslo_policy import policy as oslo_policy
from oslo_serialization import jsonutils
from oslo_utils import importutils
from oslo_utils import uuidutils
import neutron
from neutron import policy
from neutron.tests import base
_uuid = uuidutils.generate_uuid
class PolicyFileTestCase(base.BaseTestCase):
def setUp(self):
@ -251,6 +254,14 @@ class NeutronPolicyTestCase(base.BaseTestCase):
"create_port:mac": "rule:admin_or_network_owner or "
"rule:context_is_advsvc",
"create_port:device_owner": "not rule:network_device",
"create_port:fixed_ips": (
"rule:context_is_advsvc or rule:admin_or_network_owner or "
"rule:shared"),
"create_port:fixed_ips:ip_address": (
"rule:context_is_advsvc or rule:admin_or_network_owner"),
"create_port:fixed_ips:subnet_id": (
"rule:context_is_advsvc or rule:admin_or_network_owner or "
"rule:shared"),
"update_port": "rule:admin_or_owner or rule:context_is_advsvc",
"get_port": "rule:admin_or_owner or rule:context_is_advsvc",
"delete_port": "rule:admin_or_owner or rule:context_is_advsvc",
@ -281,10 +292,10 @@ class NeutronPolicyTestCase(base.BaseTestCase):
result = policy.enforce(context, action, target)
self.assertTrue(result)
def _test_nonadmin_action_on_attr(self, action, attr, value,
def _test_nonadmin_action_on_attr(self, action, obj, attr, value,
exception=None, **kwargs):
user_context = context.Context('', "user", roles=['user'])
self._test_action_on_attr(user_context, action, "network", attr,
self._test_action_on_attr(user_context, action, obj, attr,
value, exception, **kwargs)
def _test_advsvc_action_on_attr(self, action, obj, attr, value,
@ -295,15 +306,16 @@ class NeutronPolicyTestCase(base.BaseTestCase):
value, exception, **kwargs)
def test_nonadmin_write_on_private_fails(self):
self._test_nonadmin_action_on_attr('create', 'shared', False,
self._test_nonadmin_action_on_attr(
'create', 'network', 'shared', False,
oslo_policy.PolicyNotAuthorized)
def test_nonadmin_read_on_private_fails(self):
self._test_nonadmin_action_on_attr('get', 'shared', False,
self._test_nonadmin_action_on_attr('get', 'network', 'shared', False,
oslo_policy.PolicyNotAuthorized)
def test_nonadmin_write_on_shared_fails(self):
self._test_nonadmin_action_on_attr('create', 'shared', True,
self._test_nonadmin_action_on_attr('create', 'network', 'shared', True,
oslo_policy.PolicyNotAuthorized)
def test_create_port_device_owner_regex(self):
@ -322,6 +334,45 @@ class NeutronPolicyTestCase(base.BaseTestCase):
'create', 'port', 'device_owner', val
)
def test_create_port_fixed_ips_on_shared_network(self):
def fakegetnetwork(*args, **kwargs):
return {'tenant_id': 'fake',
'shared': True}
kwargs = {'network_id': _uuid()}
with mock.patch.object(directory.get_plugin(),
'get_network', new=fakegetnetwork):
self._test_nonadmin_action_on_attr(
'create', 'port',
'fixed_ips', [{'subnet_id': 'test-subnet-id'}],
**kwargs)
self._test_nonadmin_action_on_attr(
'create', 'port',
'fixed_ips', [{'ip_address': '1.2.3.4'}],
exception=oslo_policy.PolicyNotAuthorized,
**kwargs)
def test_create_port_fixed_ips_on_nonshared_network(self):
def fakegetnetwork(*args, **kwargs):
return {'tenant_id': 'fake',
'shared': False}
kwargs = {'network_id': _uuid()}
with mock.patch.object(directory.get_plugin(),
'get_network', new=fakegetnetwork):
self._test_nonadmin_action_on_attr(
'create', 'port',
'fixed_ips', [{'subnet_id': 'test-subnet-id'}],
exception=oslo_policy.PolicyNotAuthorized,
**kwargs)
self._test_nonadmin_action_on_attr(
'create', 'port',
'fixed_ips', [{'ip_address': '1.2.3.4'}],
exception=oslo_policy.PolicyNotAuthorized,
**kwargs)
def test_advsvc_get_network_works(self):
self._test_advsvc_action_on_attr('get', 'network', 'shared', False)
@ -348,7 +399,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
oslo_policy.PolicyNotAuthorized)
def test_nonadmin_read_on_shared_succeeds(self):
self._test_nonadmin_action_on_attr('get', 'shared', True)
self._test_nonadmin_action_on_attr('get', 'network', 'shared', True)
def _test_enforce_adminonly_attribute(self, action, **kwargs):
admin_context = context.get_admin_context()
@ -367,7 +418,8 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def test_reset_adminonly_attr_to_default_fails(self):
kwargs = {constants.ATTRIBUTES_TO_UPDATE: ['shared']}
self._test_nonadmin_action_on_attr('update', 'shared', False,
self._test_nonadmin_action_on_attr(
'update', 'network', 'shared', False,
oslo_policy.PolicyNotAuthorized,
**kwargs)