From 039691220817db3519149b5e2fd2607365f3d8af Mon Sep 17 00:00:00 2001 From: Slawek Kaplonski Date: Mon, 15 Apr 2019 14:17:27 +0200 Subject: [PATCH] Fix handling of network:shared field in policy module Some policy rules e.g. for create_port are using rule "network:shared" in which "shared" field is related to network resource instead of port directly. Because of that, "shared" was missing from "target" in policy enforce module thus validation wasn't working properly for such rule. This patch fixes it by adding to FieldCheck checker possibility to get network object and use its "shared" field to validate policy. Change-Id: I56c99883fce40c37a5ee26e6e661c0cc0783c42f Closes-Bug: #1808112 --- neutron/policy.py | 30 +++++++++++-- neutron/tests/unit/test_policy.py | 72 ++++++++++++++++++++++++++----- 2 files changed, 88 insertions(+), 14 deletions(-) diff --git a/neutron/policy.py b/neutron/policy.py index e1e8fa514c3..8b4f0bac02a 100644 --- a/neutron/policy.py +++ b/neutron/policy.py @@ -353,21 +353,43 @@ class FieldCheck(policy.Check): conv_func = lambda x: x self.field = field + self.resource = resource self.value = conv_func(value) self.regex = re.compile(value[1:]) if value.startswith('~') else None def __call__(self, target_dict, cred_dict, enforcer): - target_value = target_dict.get(self.field) + target_value = self._get_target_value(target_dict) # target_value might be a boolean, explicitly compare with None if target_value is None: - LOG.debug("Unable to find requested field: %(field)s in target: " - "%(target_dict)s", - {'field': self.field, 'target_dict': target_dict}) return False if self.regex: return bool(self.regex.match(target_value)) return target_value == self.value + def _get_target_value(self, target_dict): + if self.field in target_dict: + return target_dict[self.field] + # NOTE(slaweq): In case that target field is "networks:shared" we need + # to treat it in "special" way as it may be used for resources other + # than network, e.g. for port or subnet + target_value = None + if self.resource == "networks" and self.field == constants.SHARED: + target_network_id = target_dict.get("network_id") + if not target_network_id: + LOG.debug("Unable to find network_id field in target: " + "%(target_dict)s", + {'field': self.field, 'target_dict': target_dict}) + return + plugin = directory.get_plugin() + network = plugin.get_network( + context.get_admin_context(), target_network_id) + target_value = network.get(self.field) + if target_value is None: + LOG.debug("Unable to find requested field: %(field)s in target: " + "%(target_dict)s", + {'field': self.field, 'target_dict': target_dict}) + return target_value + def _prepare_check(context, action, target, pluralized): """Prepare rule, target, and credentials for the policy engine.""" diff --git a/neutron/tests/unit/test_policy.py b/neutron/tests/unit/test_policy.py index 80be10e047e..73136534d7f 100644 --- a/neutron/tests/unit/test_policy.py +++ b/neutron/tests/unit/test_policy.py @@ -28,11 +28,14 @@ from oslo_policy import fixture as op_fixture from oslo_policy import policy as oslo_policy from oslo_serialization import jsonutils from oslo_utils import importutils +from oslo_utils import uuidutils import neutron from neutron import policy from neutron.tests import base +_uuid = uuidutils.generate_uuid + class PolicyFileTestCase(base.BaseTestCase): def setUp(self): @@ -251,6 +254,14 @@ class NeutronPolicyTestCase(base.BaseTestCase): "create_port:mac": "rule:admin_or_network_owner or " "rule:context_is_advsvc", "create_port:device_owner": "not rule:network_device", + "create_port:fixed_ips": ( + "rule:context_is_advsvc or rule:admin_or_network_owner or " + "rule:shared"), + "create_port:fixed_ips:ip_address": ( + "rule:context_is_advsvc or rule:admin_or_network_owner"), + "create_port:fixed_ips:subnet_id": ( + "rule:context_is_advsvc or rule:admin_or_network_owner or " + "rule:shared"), "update_port": "rule:admin_or_owner or rule:context_is_advsvc", "get_port": "rule:admin_or_owner or rule:context_is_advsvc", "delete_port": "rule:admin_or_owner or rule:context_is_advsvc", @@ -281,10 +292,10 @@ class NeutronPolicyTestCase(base.BaseTestCase): result = policy.enforce(context, action, target) self.assertTrue(result) - def _test_nonadmin_action_on_attr(self, action, attr, value, + def _test_nonadmin_action_on_attr(self, action, obj, attr, value, exception=None, **kwargs): user_context = context.Context('', "user", roles=['user']) - self._test_action_on_attr(user_context, action, "network", attr, + self._test_action_on_attr(user_context, action, obj, attr, value, exception, **kwargs) def _test_advsvc_action_on_attr(self, action, obj, attr, value, @@ -295,15 +306,16 @@ class NeutronPolicyTestCase(base.BaseTestCase): value, exception, **kwargs) def test_nonadmin_write_on_private_fails(self): - self._test_nonadmin_action_on_attr('create', 'shared', False, - oslo_policy.PolicyNotAuthorized) + self._test_nonadmin_action_on_attr( + 'create', 'network', 'shared', False, + oslo_policy.PolicyNotAuthorized) def test_nonadmin_read_on_private_fails(self): - self._test_nonadmin_action_on_attr('get', 'shared', False, + self._test_nonadmin_action_on_attr('get', 'network', 'shared', False, oslo_policy.PolicyNotAuthorized) def test_nonadmin_write_on_shared_fails(self): - self._test_nonadmin_action_on_attr('create', 'shared', True, + self._test_nonadmin_action_on_attr('create', 'network', 'shared', True, oslo_policy.PolicyNotAuthorized) def test_create_port_device_owner_regex(self): @@ -322,6 +334,45 @@ class NeutronPolicyTestCase(base.BaseTestCase): 'create', 'port', 'device_owner', val ) + def test_create_port_fixed_ips_on_shared_network(self): + + def fakegetnetwork(*args, **kwargs): + return {'tenant_id': 'fake', + 'shared': True} + + kwargs = {'network_id': _uuid()} + with mock.patch.object(directory.get_plugin(), + 'get_network', new=fakegetnetwork): + self._test_nonadmin_action_on_attr( + 'create', 'port', + 'fixed_ips', [{'subnet_id': 'test-subnet-id'}], + **kwargs) + self._test_nonadmin_action_on_attr( + 'create', 'port', + 'fixed_ips', [{'ip_address': '1.2.3.4'}], + exception=oslo_policy.PolicyNotAuthorized, + **kwargs) + + def test_create_port_fixed_ips_on_nonshared_network(self): + + def fakegetnetwork(*args, **kwargs): + return {'tenant_id': 'fake', + 'shared': False} + + kwargs = {'network_id': _uuid()} + with mock.patch.object(directory.get_plugin(), + 'get_network', new=fakegetnetwork): + self._test_nonadmin_action_on_attr( + 'create', 'port', + 'fixed_ips', [{'subnet_id': 'test-subnet-id'}], + exception=oslo_policy.PolicyNotAuthorized, + **kwargs) + self._test_nonadmin_action_on_attr( + 'create', 'port', + 'fixed_ips', [{'ip_address': '1.2.3.4'}], + exception=oslo_policy.PolicyNotAuthorized, + **kwargs) + def test_advsvc_get_network_works(self): self._test_advsvc_action_on_attr('get', 'network', 'shared', False) @@ -348,7 +399,7 @@ class NeutronPolicyTestCase(base.BaseTestCase): oslo_policy.PolicyNotAuthorized) def test_nonadmin_read_on_shared_succeeds(self): - self._test_nonadmin_action_on_attr('get', 'shared', True) + self._test_nonadmin_action_on_attr('get', 'network', 'shared', True) def _test_enforce_adminonly_attribute(self, action, **kwargs): admin_context = context.get_admin_context() @@ -367,9 +418,10 @@ class NeutronPolicyTestCase(base.BaseTestCase): def test_reset_adminonly_attr_to_default_fails(self): kwargs = {constants.ATTRIBUTES_TO_UPDATE: ['shared']} - self._test_nonadmin_action_on_attr('update', 'shared', False, - oslo_policy.PolicyNotAuthorized, - **kwargs) + self._test_nonadmin_action_on_attr( + 'update', 'network', 'shared', False, + oslo_policy.PolicyNotAuthorized, + **kwargs) def test_enforce_adminonly_attribute_nonadminctx_returns_403(self): action = "create_network"