NSX|P: Fix removal of provider security groups from port
Change-Id: I74e39328858fe91515f7796d3bef80de934843c4
This commit is contained in:
parent
e445ead816
commit
e11acdcd09
|
@ -1131,6 +1131,7 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
|
|||
# for notifications
|
||||
original_port = super(NsxPolicyPlugin, self).get_port(
|
||||
context, port_id)
|
||||
self._remove_provider_security_groups_from_list(original_port)
|
||||
port_data = port['port']
|
||||
self._validate_update_port(context, port_id, original_port,
|
||||
port_data)
|
||||
|
|
|
@ -47,6 +47,7 @@ from neutron_lib.objects import registry as obj_reg
|
|||
from neutron_lib.plugins import directory
|
||||
|
||||
from vmware_nsx.common import utils
|
||||
from vmware_nsx.extensions import providersecuritygroup as provider_sg
|
||||
from vmware_nsx.plugins.nsx_p import plugin as nsx_plugin
|
||||
from vmware_nsx.tests import unit as vmware
|
||||
from vmware_nsx.tests.unit.common_plugin import common_v3
|
||||
|
@ -1278,6 +1279,9 @@ class NsxPTestSecurityGroup(common_v3.FixExternalNetBaseTest,
|
|||
super(NsxPTestSecurityGroup, self).setUp(plugin=plugin,
|
||||
ext_mgr=ext_mgr)
|
||||
self.project_id = test_db_base_plugin_v2.TEST_TENANT_ID
|
||||
# add provider group attributes
|
||||
secgrp.Securitygroup().update_attributes_map(
|
||||
provider_sg.EXTENDED_ATTRIBUTES_2_0)
|
||||
|
||||
def test_create_security_group_rule_icmp_with_type_and_code(self):
|
||||
"""No non-zero icmp codes are currently supported by the NSX"""
|
||||
|
@ -1354,6 +1358,66 @@ class NsxPTestSecurityGroup(common_v3.FixExternalNetBaseTest,
|
|||
tags=mock.ANY,
|
||||
category=policy_constants.CATEGORY_ENVIRONMENT)
|
||||
|
||||
def _create_provider_security_group(self):
|
||||
body = {'security_group': {'name': 'provider-deny',
|
||||
'tenant_id': self._tenant_id,
|
||||
'description': 'provider sg',
|
||||
'provider': True}}
|
||||
security_group_req = self.new_create_request('security-groups', body)
|
||||
return self.deserialize(self.fmt,
|
||||
security_group_req.get_response(self.ext_api))
|
||||
|
||||
def test_provider_sg_on_port(self):
|
||||
psg = self._create_provider_security_group()
|
||||
with mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
||||
"NsxPolicySegmentPortApi.create_or_overwrite"
|
||||
) as port_create:
|
||||
with self.port(tenant_id=self._tenant_id) as port:
|
||||
# make sure the port has the provider sg
|
||||
port_data = port['port']
|
||||
self.assertEqual(1, len(port_data['provider_security_groups']))
|
||||
self.assertEqual(psg['security_group']['id'],
|
||||
port_data['provider_security_groups'][0])
|
||||
|
||||
# Make sure the correct security groups tags were set
|
||||
port_create.assert_called_once()
|
||||
actual_tags = port_create.call_args[1]['tags']
|
||||
sg_tags = 0
|
||||
psg_tag_found = False
|
||||
for tag in actual_tags:
|
||||
if tag['scope'] == 'os-security-group':
|
||||
sg_tags += 1
|
||||
if tag['tag'] == psg['security_group']['id']:
|
||||
psg_tag_found = True
|
||||
self.assertEqual(2, sg_tags)
|
||||
self.assertTrue(psg_tag_found)
|
||||
|
||||
def test_remove_provider_sg_from_port(self):
|
||||
psg = self._create_provider_security_group()
|
||||
with self.port(tenant_id=self._tenant_id) as port:
|
||||
with mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
||||
"NsxPolicySegmentPortApi.create_or_overwrite"
|
||||
) as port_update:
|
||||
# specifically remove the provider sg from the port
|
||||
data = {'port': {'provider_security_groups': []}}
|
||||
req = self.new_update_request('ports',
|
||||
data, port['port']['id'])
|
||||
res = self.deserialize('json', req.get_response(self.api))
|
||||
self.assertEqual(0,
|
||||
len(res['port']['provider_security_groups']))
|
||||
# Make sure the correct security groups tags were set
|
||||
port_update.assert_called_once()
|
||||
actual_tags = port_update.call_args[1]['tags']
|
||||
sg_tags = 0
|
||||
psg_tag_found = False
|
||||
for tag in actual_tags:
|
||||
if tag['scope'] == 'os-security-group':
|
||||
sg_tags += 1
|
||||
if tag['tag'] == psg['security_group']['id']:
|
||||
psg_tag_found = True
|
||||
self.assertEqual(1, sg_tags)
|
||||
self.assertFalse(psg_tag_found)
|
||||
|
||||
def test_sg_rule_create_on_nsx(self):
|
||||
"""Verify that a comm-map entry is created for a new SG rule """
|
||||
name = description = 'sg1'
|
||||
|
|
Loading…
Reference in New Issue