Browse Source

Merge "Fix port can not be created with the sg of other project" into stable/train

changes/59/748959/1
Zuul 1 month ago
committed by Gerrit Code Review
parent
commit
efc89af6f6
5 changed files with 123 additions and 5 deletions
  1. +3
    -0
      doc/source/admin/archives/adv-features.rst
  2. +2
    -1
      neutron/db/securitygroups_db.py
  3. +3
    -3
      neutron/tests/unit/db/test_db_base_plugin_v2.py
  4. +99
    -0
      neutron/tests/unit/extensions/test_portsecurity.py
  5. +16
    -1
      neutron/tests/unit/extensions/test_securitygroup.py

+ 3
- 0
doc/source/admin/archives/adv-features.rst View File

@@ -350,6 +350,9 @@ APIs at the same time.
this is that Compute security group APIs are instances based and
not port based as Networking.

- When creating or updating a port with a specified security group,
the admin tenant can use the security groups of other tenants.

Basic security group operations
-------------------------------



+ 2
- 1
neutron/db/securitygroups_db.py View File

@@ -865,7 +865,8 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,

valid_groups = set(
g.id for g in sg_objs
if (not tenant_id or g.tenant_id == tenant_id or
if (context.is_admin or not tenant_id or
g.tenant_id == tenant_id or
sg_obj.SecurityGroup.is_shared_with_tenant(
context, g.id, tenant_id))
)


+ 3
- 3
neutron/tests/unit/db/test_db_base_plugin_v2.py View File

@@ -441,8 +441,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
return subnetpool_res

def _create_port(self, fmt, net_id, expected_res_status=None,
arg_list=None, set_context=False, tenant_id=None,
**kwargs):
arg_list=None, set_context=False, is_admin=False,
tenant_id=None, **kwargs):
tenant_id = tenant_id or self._tenant_id
data = {'port': {'network_id': net_id,
'tenant_id': tenant_id}}
@@ -466,7 +466,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
if set_context and tenant_id:
# create a specific auth context for this request
port_req.environ['neutron.context'] = context.Context(
'', tenant_id)
'', tenant_id, is_admin=is_admin)

port_res = port_req.get_response(self.api)
if expected_res_status:


+ 99
- 0
neutron/tests/unit/extensions/test_portsecurity.py View File

@@ -292,6 +292,57 @@ class TestPortSecurity(PortSecurityDBTestCase):
self.assertEqual(port['port']['security_groups'], [security_group_id])
self._delete('ports', port['port']['id'])

def test_create_port_with_admin_use_other_tenant_security_group(self):
if self._skip_security_group:
self.skipTest("Plugin does not support security groups")
res = self._create_network('json', 'net1', True,
arg_list=('port_security_enabled',),
set_context=True,
tenant_id='admin_tenant',
port_security_enabled=False)
net = self.deserialize('json', res)
self._create_subnet('json', net['network']['id'], '10.0.0.0/24')
security_group = self.deserialize(
'json', self._create_security_group(self.fmt, 'asdf', 'asdf',
tenant_id='other_tenant'))
security_group_id = security_group['security_group']['id']
res = self._create_port('json', net['network']['id'],
arg_list=('security_groups',
'port_security_enabled'),
set_context=True,
is_admin=True,
tenant_id='admin_tenant',
port_security_enabled=True,
security_groups=[security_group_id])
port = self.deserialize('json', res)
self.assertTrue(port['port'][psec.PORTSECURITY])
self.assertEqual(port['port']['security_groups'], [security_group_id])
self._delete('ports', port['port']['id'])

def test_create_port_with_no_admin_use_other_tenant_security_group(self):
if self._skip_security_group:
self.skipTest("Plugin does not support security groups")
res = self._create_network('json', 'net1', True,
arg_list=('port_security_enabled',),
set_context=True,
tenant_id='demo_tenant',
port_security_enabled=False)
net = self.deserialize('json', res)
self._create_subnet('json', net['network']['id'], '10.0.0.0/24',
set_context=True, tenant_id='demo_tenant')
security_group = self.deserialize(
'json', self._create_security_group(self.fmt, 'asdf', 'asdf',
tenant_id='other_tenant'))
security_group_id = security_group['security_group']['id']
res = self._create_port('json', net['network']['id'],
arg_list=('security_groups',
'port_security_enabled'),
set_context=True,
tenant_id='demo_tenant',
port_security_enabled=True,
security_groups=[security_group_id])
self.assertEqual(404, res.status_int)

def test_create_port_without_security_group_and_net_sec_false(self):
res = self._create_network('json', 'net1', True,
arg_list=('port_security_enabled',),
@@ -325,6 +376,54 @@ class TestPortSecurity(PortSecurityDBTestCase):
self.deserialize('json', req.get_response(self.api))
self._delete('ports', port['port']['id'])

def test_update_port_with_admin_use_other_tenant_security_group(self):
if self._skip_security_group:
self.skipTest("Plugin does not support security groups")
with self.network() as net:
with self.subnet(network=net):
res = self._create_port('json', net['network']['id'],
set_context=True, is_admin=True,
tenant_id='admin_tenant',)
port = self.deserialize('json', res)
self.assertTrue(port['port'][psec.PORTSECURITY])

security_group = self.deserialize('json',
self._create_security_group(self.fmt, 'asdf', 'asdf',
tenant_id='other_tenant'))
security_group_id = security_group['security_group']['id']
update_port = {'port':
{'security_groups': [security_group_id]}}
req = self.new_update_request('ports', update_port,
port['port']['id'])
port = self.deserialize('json', req.get_response(self.api))
security_groups = port['port']['security_groups']
self.assertIn(security_group_id, security_groups)
self._delete('ports', port['port']['id'])

def test_update_port_with_no_admin_use_other_tenant_security_group(self):
if self._skip_security_group:
self.skipTest("Plugin does not support security groups")
with self.network(tenant_id='demo_tenant') as net:
with self.subnet(network=net, tenant_id='demo_tenant'):
res = self._create_port('json', net['network']['id'],
set_context=True,
tenant_id='demo_tenant',)
port = self.deserialize('json', res)
self.assertTrue(port['port'][psec.PORTSECURITY])

security_group = self.deserialize('json',
self._create_security_group(self.fmt, 'asdf', 'asdf',
tenant_id='other_tenant'))
security_group_id = security_group['security_group']['id']
update_port = {'port':
{'security_groups': [security_group_id]}}
req = self.new_update_request('ports', update_port,
port['port']['id'])
req.environ['neutron.context'] = context.Context(
'', 'other_tenant')
res = req.get_response(self.api)
self.assertEqual(404, res.status_int)

def test_update_port_remove_port_security_security_group(self):
if self._skip_security_group:
self.skipTest("Plugin does not support security groups")


+ 16
- 1
neutron/tests/unit/extensions/test_securitygroup.py View File

@@ -787,7 +787,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
plugin = directory.get_plugin()
if not hasattr(plugin, '_get_security_groups_on_port'):
self.skipTest("plugin doesn't use the mixin with this method")
neutron_context = context.get_admin_context()
neutron_context = context.Context('user', 'tenant')
res = self._create_security_group(self.fmt, 'webservers', 'webservers',
tenant_id='bad_tenant')
sg1 = self.deserialize(self.fmt, res)
@@ -798,6 +798,21 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
'tenant_id': 'tenant'}}
)

def test_get_security_group_on_port_with_admin_from_other_tenant(self):
plugin = directory.get_plugin()
if not hasattr(plugin, '_get_security_groups_on_port'):
self.skipTest("plugin doesn't use the mixin with this method")
neutron_context = context.get_admin_context()
res = self._create_security_group(self.fmt, 'webservers', 'webservers',
tenant_id='other_tenant')
sg1 = self.deserialize(self.fmt, res)
sgs = plugin._get_security_groups_on_port(
neutron_context,
{'port': {'security_groups': [sg1['security_group']['id']],
'tenant_id': 'tenant'}})
sg1_id = sg1['security_group']['id']
self.assertEqual(sg1_id, sgs[0])

def test_delete_security_group(self):
name = 'webservers'
description = 'my webservers'


Loading…
Cancel
Save