diff --git a/neutron/policy.py b/neutron/policy.py index e1e8fa514c3..8b4f0bac02a 100644 --- a/neutron/policy.py +++ b/neutron/policy.py @@ -353,21 +353,43 @@ class FieldCheck(policy.Check): conv_func = lambda x: x self.field = field + self.resource = resource self.value = conv_func(value) self.regex = re.compile(value[1:]) if value.startswith('~') else None def __call__(self, target_dict, cred_dict, enforcer): - target_value = target_dict.get(self.field) + target_value = self._get_target_value(target_dict) # target_value might be a boolean, explicitly compare with None if target_value is None: - LOG.debug("Unable to find requested field: %(field)s in target: " - "%(target_dict)s", - {'field': self.field, 'target_dict': target_dict}) return False if self.regex: return bool(self.regex.match(target_value)) return target_value == self.value + def _get_target_value(self, target_dict): + if self.field in target_dict: + return target_dict[self.field] + # NOTE(slaweq): In case that target field is "networks:shared" we need + # to treat it in "special" way as it may be used for resources other + # than network, e.g. for port or subnet + target_value = None + if self.resource == "networks" and self.field == constants.SHARED: + target_network_id = target_dict.get("network_id") + if not target_network_id: + LOG.debug("Unable to find network_id field in target: " + "%(target_dict)s", + {'field': self.field, 'target_dict': target_dict}) + return + plugin = directory.get_plugin() + network = plugin.get_network( + context.get_admin_context(), target_network_id) + target_value = network.get(self.field) + if target_value is None: + LOG.debug("Unable to find requested field: %(field)s in target: " + "%(target_dict)s", + {'field': self.field, 'target_dict': target_dict}) + return target_value + def _prepare_check(context, action, target, pluralized): """Prepare rule, target, and credentials for the policy engine.""" diff --git a/neutron/tests/unit/test_policy.py b/neutron/tests/unit/test_policy.py index 80be10e047e..73136534d7f 100644 --- a/neutron/tests/unit/test_policy.py +++ b/neutron/tests/unit/test_policy.py @@ -28,11 +28,14 @@ from oslo_policy import fixture as op_fixture from oslo_policy import policy as oslo_policy from oslo_serialization import jsonutils from oslo_utils import importutils +from oslo_utils import uuidutils import neutron from neutron import policy from neutron.tests import base +_uuid = uuidutils.generate_uuid + class PolicyFileTestCase(base.BaseTestCase): def setUp(self): @@ -251,6 +254,14 @@ class NeutronPolicyTestCase(base.BaseTestCase): "create_port:mac": "rule:admin_or_network_owner or " "rule:context_is_advsvc", "create_port:device_owner": "not rule:network_device", + "create_port:fixed_ips": ( + "rule:context_is_advsvc or rule:admin_or_network_owner or " + "rule:shared"), + "create_port:fixed_ips:ip_address": ( + "rule:context_is_advsvc or rule:admin_or_network_owner"), + "create_port:fixed_ips:subnet_id": ( + "rule:context_is_advsvc or rule:admin_or_network_owner or " + "rule:shared"), "update_port": "rule:admin_or_owner or rule:context_is_advsvc", "get_port": "rule:admin_or_owner or rule:context_is_advsvc", "delete_port": "rule:admin_or_owner or rule:context_is_advsvc", @@ -281,10 +292,10 @@ class NeutronPolicyTestCase(base.BaseTestCase): result = policy.enforce(context, action, target) self.assertTrue(result) - def _test_nonadmin_action_on_attr(self, action, attr, value, + def _test_nonadmin_action_on_attr(self, action, obj, attr, value, exception=None, **kwargs): user_context = context.Context('', "user", roles=['user']) - self._test_action_on_attr(user_context, action, "network", attr, + self._test_action_on_attr(user_context, action, obj, attr, value, exception, **kwargs) def _test_advsvc_action_on_attr(self, action, obj, attr, value, @@ -295,15 +306,16 @@ class NeutronPolicyTestCase(base.BaseTestCase): value, exception, **kwargs) def test_nonadmin_write_on_private_fails(self): - self._test_nonadmin_action_on_attr('create', 'shared', False, - oslo_policy.PolicyNotAuthorized) + self._test_nonadmin_action_on_attr( + 'create', 'network', 'shared', False, + oslo_policy.PolicyNotAuthorized) def test_nonadmin_read_on_private_fails(self): - self._test_nonadmin_action_on_attr('get', 'shared', False, + self._test_nonadmin_action_on_attr('get', 'network', 'shared', False, oslo_policy.PolicyNotAuthorized) def test_nonadmin_write_on_shared_fails(self): - self._test_nonadmin_action_on_attr('create', 'shared', True, + self._test_nonadmin_action_on_attr('create', 'network', 'shared', True, oslo_policy.PolicyNotAuthorized) def test_create_port_device_owner_regex(self): @@ -322,6 +334,45 @@ class NeutronPolicyTestCase(base.BaseTestCase): 'create', 'port', 'device_owner', val ) + def test_create_port_fixed_ips_on_shared_network(self): + + def fakegetnetwork(*args, **kwargs): + return {'tenant_id': 'fake', + 'shared': True} + + kwargs = {'network_id': _uuid()} + with mock.patch.object(directory.get_plugin(), + 'get_network', new=fakegetnetwork): + self._test_nonadmin_action_on_attr( + 'create', 'port', + 'fixed_ips', [{'subnet_id': 'test-subnet-id'}], + **kwargs) + self._test_nonadmin_action_on_attr( + 'create', 'port', + 'fixed_ips', [{'ip_address': '1.2.3.4'}], + exception=oslo_policy.PolicyNotAuthorized, + **kwargs) + + def test_create_port_fixed_ips_on_nonshared_network(self): + + def fakegetnetwork(*args, **kwargs): + return {'tenant_id': 'fake', + 'shared': False} + + kwargs = {'network_id': _uuid()} + with mock.patch.object(directory.get_plugin(), + 'get_network', new=fakegetnetwork): + self._test_nonadmin_action_on_attr( + 'create', 'port', + 'fixed_ips', [{'subnet_id': 'test-subnet-id'}], + exception=oslo_policy.PolicyNotAuthorized, + **kwargs) + self._test_nonadmin_action_on_attr( + 'create', 'port', + 'fixed_ips', [{'ip_address': '1.2.3.4'}], + exception=oslo_policy.PolicyNotAuthorized, + **kwargs) + def test_advsvc_get_network_works(self): self._test_advsvc_action_on_attr('get', 'network', 'shared', False) @@ -348,7 +399,7 @@ class NeutronPolicyTestCase(base.BaseTestCase): oslo_policy.PolicyNotAuthorized) def test_nonadmin_read_on_shared_succeeds(self): - self._test_nonadmin_action_on_attr('get', 'shared', True) + self._test_nonadmin_action_on_attr('get', 'network', 'shared', True) def _test_enforce_adminonly_attribute(self, action, **kwargs): admin_context = context.get_admin_context() @@ -367,9 +418,10 @@ class NeutronPolicyTestCase(base.BaseTestCase): def test_reset_adminonly_attr_to_default_fails(self): kwargs = {constants.ATTRIBUTES_TO_UPDATE: ['shared']} - self._test_nonadmin_action_on_attr('update', 'shared', False, - oslo_policy.PolicyNotAuthorized, - **kwargs) + self._test_nonadmin_action_on_attr( + 'update', 'network', 'shared', False, + oslo_policy.PolicyNotAuthorized, + **kwargs) def test_enforce_adminonly_attribute_nonadminctx_returns_403(self): action = "create_network"