Restore tenant_id check on security group rule adds to previous semantic

We switched from swapping the tenant_id in the context to explicitly
checking the db column. Switch back, and a test that checks for
not breaking this rather odd behavior. At least, until we decide
to fix it as a bug.

Change-Id: I6af4d414b1972e14692a8356ef95db7323e3a09a
This commit is contained in:
Doug Wiegley 2019-01-29 09:33:37 -07:00 committed by Doug Wiegley
parent 8914f8247f
commit bd4c291cdf
2 changed files with 61 additions and 5 deletions

View File

@ -192,9 +192,17 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
raise ext_sg.SecurityGroupNotFound(id=id) raise ext_sg.SecurityGroupNotFound(id=id)
return sg return sg
def _check_security_group(self, context, id, **kwargs): def _check_security_group(self, context, id, tenant_id=None):
if not sg_obj.SecurityGroup.objects_exist(context, id=id, **kwargs): if tenant_id:
tmp_context_tenant_id = context.tenant_id
context.tenant_id = tenant_id
try:
if not sg_obj.SecurityGroup.objects_exist(context, id=id):
raise ext_sg.SecurityGroupNotFound(id=id) raise ext_sg.SecurityGroupNotFound(id=id)
finally:
if tenant_id:
context.tenant_id = tmp_context_tenant_id
@db_api.retry_if_session_inactive() @db_api.retry_if_session_inactive()
def delete_security_group(self, context, id): def delete_security_group(self, context, id):
@ -528,14 +536,14 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
# Check that remote_group_id exists for tenant # Check that remote_group_id exists for tenant
if remote_group_id: if remote_group_id:
self._check_security_group(context, remote_group_id, self._check_security_group(context, remote_group_id,
project_id=rule['tenant_id']) tenant_id=rule['tenant_id'])
security_group_id = rule['security_group_id'] security_group_id = rule['security_group_id']
# Confirm that the tenant has permission # Confirm that the tenant has permission
# to add rules to this security group. # to add rules to this security group.
self._check_security_group(context, security_group_id, self._check_security_group(context, security_group_id,
project_id=rule['tenant_id']) tenant_id=rule['tenant_id'])
return security_group_id return security_group_id
def _validate_security_group_rules(self, context, security_group_rules): def _validate_security_group_rules(self, context, security_group_rules):

View File

@ -128,6 +128,10 @@ class SecurityGroupsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
# create a specific auth context for this request # create a specific auth context for this request
security_group_rule_req.environ['neutron.context'] = ( security_group_rule_req.environ['neutron.context'] = (
context.Context('', kwargs['tenant_id'])) context.Context('', kwargs['tenant_id']))
elif kwargs.get('admin_context'):
security_group_rule_req.environ['neutron.context'] = (
context.Context(user_id='admin', tenant_id='admin-tenant',
is_admin=True))
return security_group_rule_req.get_response(self.ext_api) return security_group_rule_req.get_response(self.ext_api)
def _make_security_group(self, fmt, name, description, **kwargs): def _make_security_group(self, fmt, name, description, **kwargs):
@ -699,6 +703,50 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
for k, v, in keys: for k, v, in keys:
self.assertEqual(sg_rule[0][k], v) self.assertEqual(sg_rule[0][k], v)
# This test case checks that admins from a different tenant can add rules
# as themselves. This is an odd behavior, with some weird GET semantics,
# but this test is checking that we don't break that old behavior, at least
# until we make a conscious choice to do so.
def test_create_security_group_rules_admin_tenant(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
# Add a couple normal rules
rule = self._build_security_group_rule(
sg['security_group']['id'], "ingress", const.PROTO_NAME_TCP,
port_range_min=22, port_range_max=22,
remote_ip_prefix="10.0.0.0/24",
ethertype=const.IPv4)
self._make_security_group_rule(self.fmt, rule)
rule = self._build_security_group_rule(
sg['security_group']['id'], "ingress", const.PROTO_NAME_TCP,
port_range_min=22, port_range_max=22,
remote_ip_prefix="10.0.1.0/24",
ethertype=const.IPv4)
self._make_security_group_rule(self.fmt, rule)
# Let's add a rule as admin, with a different tenant_id. The
# results of this call are arguably a bug, but it is past behavior.
rule = self._build_security_group_rule(
sg['security_group']['id'], "ingress", const.PROTO_NAME_TCP,
port_range_min=22, port_range_max=22,
remote_ip_prefix="10.0.2.0/24",
ethertype=const.IPv4,
tenant_id='admin-tenant')
self._make_security_group_rule(self.fmt, rule, admin_context=True)
# Now, let's make sure all the rules are there, with their odd
# tenant_id behavior.
res = self.new_list_request('security-groups')
sgs = self.deserialize(self.fmt, res.get_response(self.ext_api))
for sg in sgs['security_groups']:
if sg['name'] == "webservers":
rules = sg['security_group_rules']
self.assertEqual(len(rules), 5)
self.assertNotEqual(rules[3]['tenant_id'], 'admin-tenant')
self.assertEqual(rules[4]['tenant_id'], 'admin-tenant')
def test_get_security_group_on_port_from_wrong_tenant(self): def test_get_security_group_on_port_from_wrong_tenant(self):
plugin = directory.get_plugin() plugin = directory.get_plugin()
if not hasattr(plugin, '_get_security_groups_on_port'): if not hasattr(plugin, '_get_security_groups_on_port'):