Fix handling of network:shared field in policy module

Some policy rules e.g. for create_port are using rule "network:shared"
in which "shared" field is related to network resource instead of
port directly.
Because of that, "shared" was missing from "target" in policy
enforce module thus validation wasn't working properly for such rule.

This patch fixes it by adding to FieldCheck checker possibility to
get network object and use its "shared" field to validate policy.

Change-Id: I56c99883fce40c37a5ee26e6e661c0cc0783c42f
Closes-Bug: #1808112
This commit is contained in:
Slawek Kaplonski 2019-04-15 14:17:27 +02:00
parent 9318fb8bb9
commit 0396912208
2 changed files with 88 additions and 14 deletions

View File

@ -353,21 +353,43 @@ class FieldCheck(policy.Check):
conv_func = lambda x: x
self.field = field
self.resource = resource
self.value = conv_func(value)
self.regex = re.compile(value[1:]) if value.startswith('~') else None
def __call__(self, target_dict, cred_dict, enforcer):
target_value = target_dict.get(self.field)
target_value = self._get_target_value(target_dict)
# target_value might be a boolean, explicitly compare with None
if target_value is None:
LOG.debug("Unable to find requested field: %(field)s in target: "
"%(target_dict)s",
{'field': self.field, 'target_dict': target_dict})
return False
if self.regex:
return bool(self.regex.match(target_value))
return target_value == self.value
def _get_target_value(self, target_dict):
if self.field in target_dict:
return target_dict[self.field]
# NOTE(slaweq): In case that target field is "networks:shared" we need
# to treat it in "special" way as it may be used for resources other
# than network, e.g. for port or subnet
target_value = None
if self.resource == "networks" and self.field == constants.SHARED:
target_network_id = target_dict.get("network_id")
if not target_network_id:
LOG.debug("Unable to find network_id field in target: "
"%(target_dict)s",
{'field': self.field, 'target_dict': target_dict})
return
plugin = directory.get_plugin()
network = plugin.get_network(
context.get_admin_context(), target_network_id)
target_value = network.get(self.field)
if target_value is None:
LOG.debug("Unable to find requested field: %(field)s in target: "
"%(target_dict)s",
{'field': self.field, 'target_dict': target_dict})
return target_value
def _prepare_check(context, action, target, pluralized):
"""Prepare rule, target, and credentials for the policy engine."""

View File

@ -28,11 +28,14 @@ from oslo_policy import fixture as op_fixture
from oslo_policy import policy as oslo_policy
from oslo_serialization import jsonutils
from oslo_utils import importutils
from oslo_utils import uuidutils
import neutron
from neutron import policy
from neutron.tests import base
_uuid = uuidutils.generate_uuid
class PolicyFileTestCase(base.BaseTestCase):
def setUp(self):
@ -251,6 +254,14 @@ class NeutronPolicyTestCase(base.BaseTestCase):
"create_port:mac": "rule:admin_or_network_owner or "
"rule:context_is_advsvc",
"create_port:device_owner": "not rule:network_device",
"create_port:fixed_ips": (
"rule:context_is_advsvc or rule:admin_or_network_owner or "
"rule:shared"),
"create_port:fixed_ips:ip_address": (
"rule:context_is_advsvc or rule:admin_or_network_owner"),
"create_port:fixed_ips:subnet_id": (
"rule:context_is_advsvc or rule:admin_or_network_owner or "
"rule:shared"),
"update_port": "rule:admin_or_owner or rule:context_is_advsvc",
"get_port": "rule:admin_or_owner or rule:context_is_advsvc",
"delete_port": "rule:admin_or_owner or rule:context_is_advsvc",
@ -281,10 +292,10 @@ class NeutronPolicyTestCase(base.BaseTestCase):
result = policy.enforce(context, action, target)
self.assertTrue(result)
def _test_nonadmin_action_on_attr(self, action, attr, value,
def _test_nonadmin_action_on_attr(self, action, obj, attr, value,
exception=None, **kwargs):
user_context = context.Context('', "user", roles=['user'])
self._test_action_on_attr(user_context, action, "network", attr,
self._test_action_on_attr(user_context, action, obj, attr,
value, exception, **kwargs)
def _test_advsvc_action_on_attr(self, action, obj, attr, value,
@ -295,15 +306,16 @@ class NeutronPolicyTestCase(base.BaseTestCase):
value, exception, **kwargs)
def test_nonadmin_write_on_private_fails(self):
self._test_nonadmin_action_on_attr('create', 'shared', False,
oslo_policy.PolicyNotAuthorized)
self._test_nonadmin_action_on_attr(
'create', 'network', 'shared', False,
oslo_policy.PolicyNotAuthorized)
def test_nonadmin_read_on_private_fails(self):
self._test_nonadmin_action_on_attr('get', 'shared', False,
self._test_nonadmin_action_on_attr('get', 'network', 'shared', False,
oslo_policy.PolicyNotAuthorized)
def test_nonadmin_write_on_shared_fails(self):
self._test_nonadmin_action_on_attr('create', 'shared', True,
self._test_nonadmin_action_on_attr('create', 'network', 'shared', True,
oslo_policy.PolicyNotAuthorized)
def test_create_port_device_owner_regex(self):
@ -322,6 +334,45 @@ class NeutronPolicyTestCase(base.BaseTestCase):
'create', 'port', 'device_owner', val
)
def test_create_port_fixed_ips_on_shared_network(self):
def fakegetnetwork(*args, **kwargs):
return {'tenant_id': 'fake',
'shared': True}
kwargs = {'network_id': _uuid()}
with mock.patch.object(directory.get_plugin(),
'get_network', new=fakegetnetwork):
self._test_nonadmin_action_on_attr(
'create', 'port',
'fixed_ips', [{'subnet_id': 'test-subnet-id'}],
**kwargs)
self._test_nonadmin_action_on_attr(
'create', 'port',
'fixed_ips', [{'ip_address': '1.2.3.4'}],
exception=oslo_policy.PolicyNotAuthorized,
**kwargs)
def test_create_port_fixed_ips_on_nonshared_network(self):
def fakegetnetwork(*args, **kwargs):
return {'tenant_id': 'fake',
'shared': False}
kwargs = {'network_id': _uuid()}
with mock.patch.object(directory.get_plugin(),
'get_network', new=fakegetnetwork):
self._test_nonadmin_action_on_attr(
'create', 'port',
'fixed_ips', [{'subnet_id': 'test-subnet-id'}],
exception=oslo_policy.PolicyNotAuthorized,
**kwargs)
self._test_nonadmin_action_on_attr(
'create', 'port',
'fixed_ips', [{'ip_address': '1.2.3.4'}],
exception=oslo_policy.PolicyNotAuthorized,
**kwargs)
def test_advsvc_get_network_works(self):
self._test_advsvc_action_on_attr('get', 'network', 'shared', False)
@ -348,7 +399,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
oslo_policy.PolicyNotAuthorized)
def test_nonadmin_read_on_shared_succeeds(self):
self._test_nonadmin_action_on_attr('get', 'shared', True)
self._test_nonadmin_action_on_attr('get', 'network', 'shared', True)
def _test_enforce_adminonly_attribute(self, action, **kwargs):
admin_context = context.get_admin_context()
@ -367,9 +418,10 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def test_reset_adminonly_attr_to_default_fails(self):
kwargs = {constants.ATTRIBUTES_TO_UPDATE: ['shared']}
self._test_nonadmin_action_on_attr('update', 'shared', False,
oslo_policy.PolicyNotAuthorized,
**kwargs)
self._test_nonadmin_action_on_attr(
'update', 'network', 'shared', False,
oslo_policy.PolicyNotAuthorized,
**kwargs)
def test_enforce_adminonly_attribute_nonadminctx_returns_403(self):
action = "create_network"