Create a policy rule to control if a rule belongs to the default SG

The policy rule ``shared_security_group`` allows to create new policy
rules checking if a security group rule belongs or not to the project
default security group.

By default the behaviour has not changed. If an administrator wants
to prevent a non-privileged user from creating or deleting rules in the
default security group, the ``create_security_group_rule`` and
``delete_security_group_rule`` can be overriden. An example is provided
in the unit tests.

Closes-Bug: #2019960

Change-Id: I6c90b61df0e726ef07f177801069baf30c49de67
This commit is contained in:
Rodolfo Alonso Hernandez 2023-10-03 14:34:55 +00:00 committed by Rodolfo Alonso
parent 1aa1f2f9cb
commit 96223931ca
7 changed files with 110 additions and 0 deletions

View File

@ -17,6 +17,8 @@ import collections
import copy
from neutron_lib.api import attributes
from neutron_lib.api.definitions import \
security_groups_rules_belongs_to_default_sg as sg_rule_default
from neutron_lib.api import faults
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
@ -467,6 +469,7 @@ class Controller(object):
for item in items:
self._validate_network_tenant_ownership(request,
item[self._resource])
self._belongs_to_default_sg(request, item[self._resource])
# For ext resources policy check, we support two types, such as
# parent_id is in request body, another type is parent_id is in
# request url, which we can get from kwargs.
@ -830,6 +833,16 @@ class Controller(object):
"%s_%s" % (constants.EXT_PARENT_PREFIX, self._parent_id_name),
parent_id)
def _belongs_to_default_sg(self, request, resource_item):
"""Add the SG default flag to the SG rules during the creation"""
if self._resource != 'security_group_rule':
return
default_sg_id = self._plugin.get_default_security_group(
request.context.elevated(), resource_item['project_id'])
resource_item[sg_rule_default.BELONGS_TO_DEFAULT_SG] = (
default_sg_id == resource_item['security_group_id'])
def create_resource(collection, resource, plugin, params, allow_bulk=False,
member_actions=None, parent=None, allow_pagination=False,

View File

@ -48,6 +48,13 @@ rules = [
check_str='field:security_groups:shared=True',
description='Definition of a shared security group'
),
policy.RuleDefault(
name='rule_default_sg',
check_str='field:security_group_rules:belongs_to_default_sg=True',
description='Definition of a security group rule that belongs to the '
'project default security group'
),
# TODO(amotoki): admin_or_owner is the right rule?
# Does an empty string make more sense for create_security_group?
policy.DocumentedRuleDefault(

View File

@ -227,6 +227,13 @@ class SecurityGroupDbMixin(
if tenant_id:
context.tenant_id = tmp_context_tenant_id
@db_api.retry_if_session_inactive()
def get_default_security_group(self, context, project_id):
default_sg = sg_obj.DefaultSecurityGroup.get_object(
context, project_id=project_id)
if default_sg:
return default_sg.security_group_id
@db_api.retry_if_session_inactive()
def delete_security_group(self, context, id):
filters = {'security_group_id': [id]}

View File

@ -380,6 +380,10 @@ class SecurityGroupPluginBase(object, metaclass=abc.ABCMeta):
def get_security_group(self, context, id, fields=None):
pass
@abc.abstractmethod
def get_default_security_group(self, context, project_id):
pass
@abc.abstractmethod
def create_security_group_rule(self, context, security_group_rule):
pass

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import tempfile
import warnings
from neutron_lib import context
@ -130,3 +131,16 @@ class RuleScopesTestCase(PolicyBaseTestCase):
self.assertListEqual(expected_scope_types,
rule.scope_types,
fail_msg)
def write_policies(policies):
env_path = tempfile.mkdtemp(prefix='policy_test_', dir='/tmp/')
with tempfile.NamedTemporaryFile('w+', dir=env_path,
delete=False) as policy_file:
policy_file.write(str(policies))
return policy_file.name
def reload_policies(policy_file):
policy.reset()
policy.init(policy_file=policy_file, suppress_deprecation_warnings=True)

View File

@ -256,6 +256,24 @@ class SecurityGroupRuleAPITestCase(base.PolicyBaseTestCase):
'neutron_lib.plugins.directory.get_plugin',
return_value=self.plugin_mock).start()
def override_create_security_group_rule(self):
self._override_security_group_rule('create_security_group_rule')
def override_delete_security_group_rule(self):
self._override_security_group_rule('delete_security_group_rule')
def _override_security_group_rule(self, rule_name):
# Admin or (member and not default SG) --> only admin can perform the
# ``rule_name`` action in the default SG.
rule = {rule_name:
'role:admin or (role:member and project_id:%(project_id)s '
'and not rule:rule_default_sg)'}
policy_file = base.write_policies(rule)
self.target['belongs_to_default_sg'] = 'True'
base.reload_policies(policy_file)
self.plugin_mock.get_default_security_group.return_value = (
self.sg['id'])
class SystemAdminSecurityGroupRuleTests(SecurityGroupRuleAPITestCase):
@ -322,6 +340,15 @@ class AdminSecurityGroupRuleTests(SecurityGroupRuleAPITestCase):
policy.enforce(self.context,
'create_security_group_rule', self.alt_target))
def test_create_security_group_rule_default_sg(self):
self.override_create_security_group_rule()
self.assertTrue(
policy.enforce(self.context,
'create_security_group_rule', self.target))
self.assertTrue(
policy.enforce(self.context,
'create_security_group_rule', self.alt_target))
def test_get_security_group_rule(self):
self.assertTrue(
policy.enforce(self.context,
@ -338,6 +365,15 @@ class AdminSecurityGroupRuleTests(SecurityGroupRuleAPITestCase):
policy.enforce(self.context,
'delete_security_group_rule', self.alt_target))
def test_delete_security_group_rule_default_sg(self):
self.override_delete_security_group_rule()
self.assertTrue(
policy.enforce(self.context,
'delete_security_group_rule', self.target))
self.assertTrue(
policy.enforce(self.context,
'delete_security_group_rule', self.alt_target))
class ProjectMemberSecurityGroupRuleTests(AdminSecurityGroupRuleTests):
@ -354,6 +390,17 @@ class ProjectMemberSecurityGroupRuleTests(AdminSecurityGroupRuleTests):
policy.enforce,
self.context, 'create_security_group_rule', self.alt_target)
def test_create_security_group_rule_default_sg(self):
self.override_create_security_group_rule()
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_security_group_rule', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_security_group_rule', self.alt_target)
def test_get_security_group_rule(self):
self.assertTrue(
policy.enforce(self.context,
@ -383,6 +430,17 @@ class ProjectMemberSecurityGroupRuleTests(AdminSecurityGroupRuleTests):
policy.enforce,
self.context, 'delete_security_group_rule', self.alt_target)
def test_delete_security_group_rule_default_sg(self):
self.override_delete_security_group_rule()
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_security_group_rule', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_security_group_rule', self.alt_target)
class ProjectReaderSecurityGroupRuleTests(ProjectMemberSecurityGroupRuleTests):

View File

@ -0,0 +1,7 @@
---
features:
- |
A new policy rule check ``rule_default_sg`` has been added. This rule
allows to check if a security group rule belongs or not to the project
default security group. The administrator can override the rule creation
and rule deletion, disallowing a non-privileged user from these actions.