Merge "[S-RBAC] Fix policies for the SG rules API"

This commit is contained in:
Zuul
2025-03-13 09:09:43 +00:00
committed by Gerrit Code Review
6 changed files with 99 additions and 26 deletions

View File

@@ -83,6 +83,19 @@ ADMIN_OR_NET_OWNER_MEMBER = (
ADMIN_OR_NET_OWNER_READER = (
'(' + ADMIN + ') or (' + NET_OWNER_READER + ')')
# Those rules for the SG owner are needed for the policies related to the
# Security Group rules and are very similar to the parent owner rules defined
# above. We should probably deprecate SG_OWNER rules and use PARENT_OWNER
# instead but this can be done later
# TODO(slaweq): Deprecate SG_OWNER rules and replace them with PARENT_OWNER
# rules but for that, 'ext_parent_owner:tenant_id' needs to be added to the SG
# rule target dict
SG_OWNER_MEMBER = 'role:member and ' + RULE_SG_OWNER
SG_OWNER_READER = 'role:reader and ' + RULE_SG_OWNER
ADMIN_OR_SG_OWNER_MEMBER = (
'(' + ADMIN + ') or (' + SG_OWNER_MEMBER + ')')
ADMIN_OR_SG_OWNER_READER = (
'(' + ADMIN + ') or (' + SG_OWNER_READER + ')')
rules = [
policy.RuleDefault(

View File

@@ -208,11 +208,9 @@ rules = [
deprecated_since="2025.1")
),
# TODO(amotoki): admin_or_owner is the right rule?
# Does an empty string make more sense for create_security_group_rule?
policy.DocumentedRuleDefault(
name='create_security_group_rule',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
check_str=base.ADMIN_OR_SG_OWNER_MEMBER,
scope_types=['project'],
description='Create a security group rule',
operations=[
@@ -229,9 +227,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='get_security_group_rule',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_PROJECT_READER,
base.RULE_SG_OWNER),
check_str=base.ADMIN_OR_SG_OWNER_READER,
scope_types=['project'],
description='Get a security group rule',
operations=[
@@ -252,7 +248,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='delete_security_group_rule',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
check_str=base.ADMIN_OR_SG_OWNER_MEMBER,
scope_types=['project'],
description='Delete a security group rule',
operations=[

View File

@@ -27,7 +27,6 @@ from neutron_lib import exceptions
from neutron_lib.plugins import directory
from neutron_lib.services import constants as service_const
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log as logging
from oslo_policy import policy
from oslo_utils import excutils
@@ -292,7 +291,8 @@ class OwnerCheck(policy.Check):
return OwnerCheck(self._orig_kind, self._orig_match)
@cache.cache_method_results
def _extract(self, resource_type, resource_id, field):
def _extract(self, resource_type, resource_id, field,
retry_if_not_found=True):
# NOTE(salv-orlando): This check currently assumes the parent
# resource is handled by the core plugin. It might be worth
# having a way to map resources to plugins so to make this
@@ -309,12 +309,15 @@ class OwnerCheck(policy.Check):
resource_id,
fields=[field])
except exceptions.NotFound as e:
# NOTE(kevinbenton): a NotFound exception can occur if a
# NOTE(kevinbenton, slaweq): a NotFound exception can occur if a
# list operation is happening at the same time as one of
# the parents and its children being deleted. So we issue
# a RetryRequest so the API will redo the lookup and the
# retry to get it once again if we didn't yet.
# problem items will be gone.
raise db_exc.RetryRequest(e)
if retry_if_not_found:
return self._extract(resource_type, resource_id, field,
retry_if_not_found=False)
raise e
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception('Policy check error while calling %s!', f)

View File

@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from unittest import mock
from oslo_policy import policy as base_policy
@@ -388,19 +389,35 @@ class SecurityGroupRuleAPITestCase(base.PolicyBaseTestCase):
super().setUp()
self.sg = {
'id': uuidutils.generate_uuid(),
'project_id': self.project_id}
'project_id': self.project_id,
'tenant_id': self.project_id}
self.alt_sg = {
'id': uuidutils.generate_uuid(),
'project_id': self.alt_project_id,
'tenant_id': self.alt_project_id}
self.target = {
'project_id': self.project_id,
'tenant_id': self.project_id,
'security_group_id': self.sg['id'],
'ext_parent:tenant_id': self.sg['id'],
'ext_parent_security_group_id': self.sg['id']}
self.alt_target = {
'project_id': self.alt_project_id,
'security_group_id': self.sg['id'],
'ext_parent_security_group_id': self.sg['id']}
'tenant_id': self.alt_project_id,
'security_group_id': self.alt_sg['id'],
'ext_parent:tenant_id': self.alt_sg['id'],
'ext_parent_security_group_id': self.alt_sg['id']}
def get_security_group_mock(context, id,
fields=None, tenant_id=None):
if id == self.alt_sg['id']:
return self.alt_sg
return self.sg
self.plugin_mock = mock.Mock()
self.plugin_mock.get_security_group.return_value = self.sg
self.plugin_mock.get_security_group.side_effect = (
get_security_group_mock)
mock.patch(
'neutron_lib.plugins.directory.get_plugin',
return_value=self.plugin_mock).start()
@@ -539,6 +556,17 @@ class ProjectManagerSecurityGroupRuleTests(AdminSecurityGroupRuleTests):
policy.enforce,
self.context, 'create_security_group_rule', self.alt_target)
# Test for the SG_OWNER different then current user case:
target = copy.copy(self.target)
target['security_group_id'] = self.alt_sg['id']
target['ext_parent:tenant_id'] = self.alt_sg['tenant_id']
target['ext_parent_security_group_id'] = self.alt_sg['id']
self.plugin_mock.get_security_group.return_value = self.alt_sg
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_security_group_rule', target)
def test_create_security_group_rule_default_sg(self):
self.override_create_security_group_rule()
self.assertRaises(
@@ -563,11 +591,23 @@ class ProjectManagerSecurityGroupRuleTests(AdminSecurityGroupRuleTests):
self.assertTrue(
policy.enforce(self.context,
'delete_security_group_rule', self.target))
self.plugin_mock.get_security_group.return_value = self.alt_sg
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_security_group_rule', self.alt_target)
# Test for the SG_OWNER different then current user case:
target = copy.copy(self.target)
target['security_group_id'] = self.alt_sg['id']
target['ext_parent:tenant_id'] = self.alt_sg['tenant_id']
target['ext_parent_security_group_id'] = self.alt_sg['id']
self.plugin_mock.get_security_group.return_value = self.alt_sg
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_security_group_rule', target)
def test_delete_security_group_rule_default_sg(self):
self.override_delete_security_group_rule()
self.assertRaises(
@@ -603,6 +643,16 @@ class ProjectReaderSecurityGroupRuleTests(ProjectMemberSecurityGroupRuleTests):
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_security_group_rule', self.alt_target)
# Test for the SG_OWNER different then current user case:
target = copy.copy(self.target)
target['security_group_id'] = self.alt_sg['id']
target['ext_parent:tenant_id'] = self.alt_sg['tenant_id']
target['ext_parent_security_group_id'] = self.alt_sg['id']
self.plugin_mock.get_security_group.return_value = self.alt_sg
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_security_group_rule', target)
def test_delete_security_group_rule(self):
self.assertRaises(
@@ -613,6 +663,16 @@ class ProjectReaderSecurityGroupRuleTests(ProjectMemberSecurityGroupRuleTests):
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_security_group_rule', self.alt_target)
# Test for the SG_OWNER different then current user case:
target = copy.copy(self.target)
target['security_group_id'] = self.alt_sg['id']
target['ext_parent:tenant_id'] = self.alt_sg['tenant_id']
target['ext_parent_security_group_id'] = self.alt_sg['id']
self.plugin_mock.get_security_group.return_value = self.alt_sg
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_security_group_rule', target)
class ServiceRoleSecurityGroupRuleTests(SecurityGroupRuleAPITestCase):

View File

@@ -1350,7 +1350,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
port_range_max,
remote_ip_prefix,
remote_group_id)
res = self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule, as_admin=True)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
@@ -1383,7 +1383,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
tenant_id='bad_tenant',
set_context=True)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPNotFound.code, res.status_int)
self.assertEqual(webob.exc.HTTPForbidden.code, res.status_int)
def test_create_security_group_rule_bad_tenant_remote_group_id(self):
with self.security_group() as sg:
@@ -1424,7 +1424,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
tenant_id='bad_tenant',
set_context=True)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPNotFound.code, res.status_int)
self.assertEqual(webob.exc.HTTPForbidden.code, res.status_int)
def test_create_security_group_rule_bad_remote_group_id(self):
name = 'webservers'
@@ -2146,7 +2146,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
port_range_max,
remote_ip_prefix,
remote_group_id)
res = self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule, as_admin=True)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)

View File

@@ -26,7 +26,6 @@ from neutron_lib import exceptions
from neutron_lib.plugins import constants as plugin_constants
from neutron_lib.plugins import directory
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_policy import fixture as op_fixture
from oslo_policy import policy as oslo_policy
from oslo_serialization import jsonutils
@@ -768,14 +767,16 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def test_retryrequest_on_notfound(self):
failure = exceptions.NetworkNotFound(net_id='whatever')
action = "create_port:mac"
with mock.patch.object(directory.get_plugin(),
'get_network', side_effect=failure):
with mock.patch.object(
directory.get_plugin(),
'get_network', side_effect=failure) as get_network_mock:
target = {'network_id': 'whatever'}
try:
policy.enforce(self.context, action, target)
self.fail("Did not raise RetryRequest")
except db_exc.RetryRequest as e:
self.assertEqual(failure, e.inner_exc)
self.fail("Did not raise NotFound exception and retry "
"DB request.")
except exceptions.NetworkNotFound:
self.assertEqual(2, get_network_mock.call_count)
def test_enforce_tenant_id_check_parent_resource_bw_compatibility(self):