From bd4c291cdfb5a75976b1f846f6d7ca88d2d154e9 Mon Sep 17 00:00:00 2001 From: Doug Wiegley Date: Tue, 29 Jan 2019 09:33:37 -0700 Subject: [PATCH] Restore tenant_id check on security group rule adds to previous semantic We switched from swapping the tenant_id in the context to explicitly checking the db column. Switch back, and a test that checks for not breaking this rather odd behavior. At least, until we decide to fix it as a bug. Change-Id: I6af4d414b1972e14692a8356ef95db7323e3a09a --- neutron/db/securitygroups_db.py | 18 +++++-- .../unit/extensions/test_securitygroup.py | 48 +++++++++++++++++++ 2 files changed, 61 insertions(+), 5 deletions(-) diff --git a/neutron/db/securitygroups_db.py b/neutron/db/securitygroups_db.py index 2c78f51f110..a03278723f9 100644 --- a/neutron/db/securitygroups_db.py +++ b/neutron/db/securitygroups_db.py @@ -192,9 +192,17 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase): raise ext_sg.SecurityGroupNotFound(id=id) return sg - def _check_security_group(self, context, id, **kwargs): - if not sg_obj.SecurityGroup.objects_exist(context, id=id, **kwargs): - raise ext_sg.SecurityGroupNotFound(id=id) + def _check_security_group(self, context, id, tenant_id=None): + if tenant_id: + tmp_context_tenant_id = context.tenant_id + context.tenant_id = tenant_id + + try: + if not sg_obj.SecurityGroup.objects_exist(context, id=id): + raise ext_sg.SecurityGroupNotFound(id=id) + finally: + if tenant_id: + context.tenant_id = tmp_context_tenant_id @db_api.retry_if_session_inactive() def delete_security_group(self, context, id): @@ -528,14 +536,14 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase): # Check that remote_group_id exists for tenant if remote_group_id: self._check_security_group(context, remote_group_id, - project_id=rule['tenant_id']) + tenant_id=rule['tenant_id']) security_group_id = rule['security_group_id'] # Confirm that the tenant has permission # to add rules to this security group. self._check_security_group(context, security_group_id, - project_id=rule['tenant_id']) + tenant_id=rule['tenant_id']) return security_group_id def _validate_security_group_rules(self, context, security_group_rules): diff --git a/neutron/tests/unit/extensions/test_securitygroup.py b/neutron/tests/unit/extensions/test_securitygroup.py index c71bcc24c7d..295840cc821 100644 --- a/neutron/tests/unit/extensions/test_securitygroup.py +++ b/neutron/tests/unit/extensions/test_securitygroup.py @@ -128,6 +128,10 @@ class SecurityGroupsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase): # create a specific auth context for this request security_group_rule_req.environ['neutron.context'] = ( context.Context('', kwargs['tenant_id'])) + elif kwargs.get('admin_context'): + security_group_rule_req.environ['neutron.context'] = ( + context.Context(user_id='admin', tenant_id='admin-tenant', + is_admin=True)) return security_group_rule_req.get_response(self.ext_api) def _make_security_group(self, fmt, name, description, **kwargs): @@ -699,6 +703,50 @@ class TestSecurityGroups(SecurityGroupDBTestCase): for k, v, in keys: self.assertEqual(sg_rule[0][k], v) + # This test case checks that admins from a different tenant can add rules + # as themselves. This is an odd behavior, with some weird GET semantics, + # but this test is checking that we don't break that old behavior, at least + # until we make a conscious choice to do so. + def test_create_security_group_rules_admin_tenant(self): + name = 'webservers' + description = 'my webservers' + with self.security_group(name, description) as sg: + # Add a couple normal rules + rule = self._build_security_group_rule( + sg['security_group']['id'], "ingress", const.PROTO_NAME_TCP, + port_range_min=22, port_range_max=22, + remote_ip_prefix="10.0.0.0/24", + ethertype=const.IPv4) + self._make_security_group_rule(self.fmt, rule) + + rule = self._build_security_group_rule( + sg['security_group']['id'], "ingress", const.PROTO_NAME_TCP, + port_range_min=22, port_range_max=22, + remote_ip_prefix="10.0.1.0/24", + ethertype=const.IPv4) + self._make_security_group_rule(self.fmt, rule) + + # Let's add a rule as admin, with a different tenant_id. The + # results of this call are arguably a bug, but it is past behavior. + rule = self._build_security_group_rule( + sg['security_group']['id'], "ingress", const.PROTO_NAME_TCP, + port_range_min=22, port_range_max=22, + remote_ip_prefix="10.0.2.0/24", + ethertype=const.IPv4, + tenant_id='admin-tenant') + self._make_security_group_rule(self.fmt, rule, admin_context=True) + + # Now, let's make sure all the rules are there, with their odd + # tenant_id behavior. + res = self.new_list_request('security-groups') + sgs = self.deserialize(self.fmt, res.get_response(self.ext_api)) + for sg in sgs['security_groups']: + if sg['name'] == "webservers": + rules = sg['security_group_rules'] + self.assertEqual(len(rules), 5) + self.assertNotEqual(rules[3]['tenant_id'], 'admin-tenant') + self.assertEqual(rules[4]['tenant_id'], 'admin-tenant') + def test_get_security_group_on_port_from_wrong_tenant(self): plugin = directory.get_plugin() if not hasattr(plugin, '_get_security_groups_on_port'):