From 96223931cae782a997271c17ea8092ed277d2db3 Mon Sep 17 00:00:00 2001 From: Rodolfo Alonso Hernandez Date: Tue, 3 Oct 2023 14:34:55 +0000 Subject: [PATCH] Create a policy rule to control if a rule belongs to the default SG The policy rule ``shared_security_group`` allows to create new policy rules checking if a security group rule belongs or not to the project default security group. By default the behaviour has not changed. If an administrator wants to prevent a non-privileged user from creating or deleting rules in the default security group, the ``create_security_group_rule`` and ``delete_security_group_rule`` can be overriden. An example is provided in the unit tests. Closes-Bug: #2019960 Change-Id: I6c90b61df0e726ef07f177801069baf30c49de67 --- neutron/api/v2/base.py | 13 +++++ neutron/conf/policies/security_group.py | 7 +++ neutron/db/securitygroups_db.py | 7 +++ neutron/extensions/securitygroup.py | 4 ++ neutron/tests/unit/conf/policies/test_base.py | 14 +++++ .../unit/conf/policies/test_security_group.py | 58 +++++++++++++++++++ ...group-rules-policies-b6e350477c88edd8.yaml | 7 +++ 7 files changed, 110 insertions(+) create mode 100644 releasenotes/notes/default-security-group-rules-policies-b6e350477c88edd8.yaml diff --git a/neutron/api/v2/base.py b/neutron/api/v2/base.py index 4dffbed7c43..0660b177147 100644 --- a/neutron/api/v2/base.py +++ b/neutron/api/v2/base.py @@ -17,6 +17,8 @@ import collections import copy from neutron_lib.api import attributes +from neutron_lib.api.definitions import \ + security_groups_rules_belongs_to_default_sg as sg_rule_default from neutron_lib.api import faults from neutron_lib.callbacks import events from neutron_lib.callbacks import registry @@ -467,6 +469,7 @@ class Controller(object): for item in items: self._validate_network_tenant_ownership(request, item[self._resource]) + self._belongs_to_default_sg(request, item[self._resource]) # For ext resources policy check, we support two types, such as # parent_id is in request body, another type is parent_id is in # request url, which we can get from kwargs. @@ -830,6 +833,16 @@ class Controller(object): "%s_%s" % (constants.EXT_PARENT_PREFIX, self._parent_id_name), parent_id) + def _belongs_to_default_sg(self, request, resource_item): + """Add the SG default flag to the SG rules during the creation""" + if self._resource != 'security_group_rule': + return + + default_sg_id = self._plugin.get_default_security_group( + request.context.elevated(), resource_item['project_id']) + resource_item[sg_rule_default.BELONGS_TO_DEFAULT_SG] = ( + default_sg_id == resource_item['security_group_id']) + def create_resource(collection, resource, plugin, params, allow_bulk=False, member_actions=None, parent=None, allow_pagination=False, diff --git a/neutron/conf/policies/security_group.py b/neutron/conf/policies/security_group.py index dbb7be72916..c1d80e9e2b5 100644 --- a/neutron/conf/policies/security_group.py +++ b/neutron/conf/policies/security_group.py @@ -48,6 +48,13 @@ rules = [ check_str='field:security_groups:shared=True', description='Definition of a shared security group' ), + policy.RuleDefault( + name='rule_default_sg', + check_str='field:security_group_rules:belongs_to_default_sg=True', + description='Definition of a security group rule that belongs to the ' + 'project default security group' + ), + # TODO(amotoki): admin_or_owner is the right rule? # Does an empty string make more sense for create_security_group? policy.DocumentedRuleDefault( diff --git a/neutron/db/securitygroups_db.py b/neutron/db/securitygroups_db.py index d44c79ab1ec..4f2c1377bc5 100644 --- a/neutron/db/securitygroups_db.py +++ b/neutron/db/securitygroups_db.py @@ -227,6 +227,13 @@ class SecurityGroupDbMixin( if tenant_id: context.tenant_id = tmp_context_tenant_id + @db_api.retry_if_session_inactive() + def get_default_security_group(self, context, project_id): + default_sg = sg_obj.DefaultSecurityGroup.get_object( + context, project_id=project_id) + if default_sg: + return default_sg.security_group_id + @db_api.retry_if_session_inactive() def delete_security_group(self, context, id): filters = {'security_group_id': [id]} diff --git a/neutron/extensions/securitygroup.py b/neutron/extensions/securitygroup.py index a20b703f038..8bf11d9de54 100644 --- a/neutron/extensions/securitygroup.py +++ b/neutron/extensions/securitygroup.py @@ -380,6 +380,10 @@ class SecurityGroupPluginBase(object, metaclass=abc.ABCMeta): def get_security_group(self, context, id, fields=None): pass + @abc.abstractmethod + def get_default_security_group(self, context, project_id): + pass + @abc.abstractmethod def create_security_group_rule(self, context, security_group_rule): pass diff --git a/neutron/tests/unit/conf/policies/test_base.py b/neutron/tests/unit/conf/policies/test_base.py index f2d16cdd8c1..eae85d0927a 100644 --- a/neutron/tests/unit/conf/policies/test_base.py +++ b/neutron/tests/unit/conf/policies/test_base.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import tempfile import warnings from neutron_lib import context @@ -130,3 +131,16 @@ class RuleScopesTestCase(PolicyBaseTestCase): self.assertListEqual(expected_scope_types, rule.scope_types, fail_msg) + + +def write_policies(policies): + env_path = tempfile.mkdtemp(prefix='policy_test_', dir='/tmp/') + with tempfile.NamedTemporaryFile('w+', dir=env_path, + delete=False) as policy_file: + policy_file.write(str(policies)) + return policy_file.name + + +def reload_policies(policy_file): + policy.reset() + policy.init(policy_file=policy_file, suppress_deprecation_warnings=True) diff --git a/neutron/tests/unit/conf/policies/test_security_group.py b/neutron/tests/unit/conf/policies/test_security_group.py index 160a22acde3..7aab4c9a558 100644 --- a/neutron/tests/unit/conf/policies/test_security_group.py +++ b/neutron/tests/unit/conf/policies/test_security_group.py @@ -256,6 +256,24 @@ class SecurityGroupRuleAPITestCase(base.PolicyBaseTestCase): 'neutron_lib.plugins.directory.get_plugin', return_value=self.plugin_mock).start() + def override_create_security_group_rule(self): + self._override_security_group_rule('create_security_group_rule') + + def override_delete_security_group_rule(self): + self._override_security_group_rule('delete_security_group_rule') + + def _override_security_group_rule(self, rule_name): + # Admin or (member and not default SG) --> only admin can perform the + # ``rule_name`` action in the default SG. + rule = {rule_name: + 'role:admin or (role:member and project_id:%(project_id)s ' + 'and not rule:rule_default_sg)'} + policy_file = base.write_policies(rule) + self.target['belongs_to_default_sg'] = 'True' + base.reload_policies(policy_file) + self.plugin_mock.get_default_security_group.return_value = ( + self.sg['id']) + class SystemAdminSecurityGroupRuleTests(SecurityGroupRuleAPITestCase): @@ -322,6 +340,15 @@ class AdminSecurityGroupRuleTests(SecurityGroupRuleAPITestCase): policy.enforce(self.context, 'create_security_group_rule', self.alt_target)) + def test_create_security_group_rule_default_sg(self): + self.override_create_security_group_rule() + self.assertTrue( + policy.enforce(self.context, + 'create_security_group_rule', self.target)) + self.assertTrue( + policy.enforce(self.context, + 'create_security_group_rule', self.alt_target)) + def test_get_security_group_rule(self): self.assertTrue( policy.enforce(self.context, @@ -338,6 +365,15 @@ class AdminSecurityGroupRuleTests(SecurityGroupRuleAPITestCase): policy.enforce(self.context, 'delete_security_group_rule', self.alt_target)) + def test_delete_security_group_rule_default_sg(self): + self.override_delete_security_group_rule() + self.assertTrue( + policy.enforce(self.context, + 'delete_security_group_rule', self.target)) + self.assertTrue( + policy.enforce(self.context, + 'delete_security_group_rule', self.alt_target)) + class ProjectMemberSecurityGroupRuleTests(AdminSecurityGroupRuleTests): @@ -354,6 +390,17 @@ class ProjectMemberSecurityGroupRuleTests(AdminSecurityGroupRuleTests): policy.enforce, self.context, 'create_security_group_rule', self.alt_target) + def test_create_security_group_rule_default_sg(self): + self.override_create_security_group_rule() + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_security_group_rule', self.target) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_security_group_rule', self.alt_target) + def test_get_security_group_rule(self): self.assertTrue( policy.enforce(self.context, @@ -383,6 +430,17 @@ class ProjectMemberSecurityGroupRuleTests(AdminSecurityGroupRuleTests): policy.enforce, self.context, 'delete_security_group_rule', self.alt_target) + def test_delete_security_group_rule_default_sg(self): + self.override_delete_security_group_rule() + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'delete_security_group_rule', self.target) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'delete_security_group_rule', self.alt_target) + class ProjectReaderSecurityGroupRuleTests(ProjectMemberSecurityGroupRuleTests): diff --git a/releasenotes/notes/default-security-group-rules-policies-b6e350477c88edd8.yaml b/releasenotes/notes/default-security-group-rules-policies-b6e350477c88edd8.yaml new file mode 100644 index 00000000000..a6720cf30ea --- /dev/null +++ b/releasenotes/notes/default-security-group-rules-policies-b6e350477c88edd8.yaml @@ -0,0 +1,7 @@ +--- +features: + - | + A new policy rule check ``rule_default_sg`` has been added. This rule + allows to check if a security group rule belongs or not to the project + default security group. The administrator can override the rule creation + and rule deletion, disallowing a non-privileged user from these actions.